1
0
Fork 0

Added Effective Closeness

This commit is contained in:
Simon Grätzer 2017-02-17 03:41:51 +01:00
parent ddee4c3619
commit 18fb10e2f7
21 changed files with 547 additions and 127 deletions

View File

@ -373,6 +373,8 @@ SET(ARANGOD_SOURCES
Pregel/Algos/SCC.cpp
Pregel/Algos/AsyncSCC.cpp
# Pregel/Algos/DMID.cpp
Pregel/Algos/EffectiveCloseness/EffectiveCloseness.cpp
Pregel/Algos/EffectiveCloseness/HLLCounter.cpp
Pregel/Conductor.cpp
Pregel/GraphStore.cpp
Pregel/IncomingCache.cpp

View File

@ -75,7 +75,7 @@ void AggregatorHandler::aggregateValues(AggregatorHandler const& workerValues) {
}
}
void AggregatorHandler::aggregateValues(VPackSlice const& workerValues) {
void AggregatorHandler::aggregateValues(VPackSlice const& workerValues) {
VPackSlice values = workerValues.get(Utils::aggregatorValuesKey);
if (values.isObject()) {
for (auto const& keyValue : VPackObjectIterator(values)) {

View File

@ -23,6 +23,7 @@
#include "Pregel/AlgoRegistry.h"
#include "Pregel/Algos/AsyncSCC.h"
#include "Pregel/Algos/ConnectedComponents.h"
#include "Pregel/Algos/EffectiveCloseness/EffectiveCloseness.h"
#include "Pregel/Algos/LineRank.h"
#include "Pregel/Algos/PageRank.h"
#include "Pregel/Algos/RecoveringPageRank.h"
@ -46,6 +47,8 @@ IAlgorithm* AlgoRegistry::createAlgorithm(std::string const& algorithm,
return new algos::ShortestPathAlgorithm(userParams);
} else if (algorithm == "linerank") {
return new algos::LineRank(userParams);
} else if (algorithm == "effectivecloseness") {
return new algos::EffectiveCloseness(userParams);
} else if (algorithm == "connectedcomponents") {
return new algos::ConnectedComponents(userParams);
} else if (algorithm == "scc") {
@ -89,6 +92,9 @@ IWorker* AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, VPackSlice body) {
body);
} else if (algorithm == "linerank") {
return createWorker(vocbase, new algos::LineRank(userParams), body);
} else if (algorithm == "effectivecloseness") {
return createWorker(vocbase, new algos::EffectiveCloseness(userParams),
body);
} else if (algorithm == "connectedcomponents") {
return createWorker(vocbase, new algos::ConnectedComponents(userParams),
body);

View File

@ -0,0 +1,138 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "EffectiveCloseness.h"
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
#include "Pregel/Aggregator.h"
#include "Pregel/Algorithm.h"
#include "Pregel/Algos/EffectiveCloseness/HLLCounterFormat.h"
#include "Pregel/GraphStore.h"
#include "Pregel/IncomingCache.h"
#include "Pregel/MasterContext.h"
#include "Pregel/VertexComputation.h"
using namespace arangodb;
using namespace arangodb::pregel;
using namespace arangodb::pregel::algos;
MessageFormat<HLLCounter>* EffectiveCloseness::messageFormat() const {
return new HLLCounterFormat();
}
MessageCombiner<HLLCounter>* EffectiveCloseness::messageCombiner() const {
return new HLLCounterCombiner();
}
struct ECComputation : public VertexComputation<ECValue, int32_t, HLLCounter> {
ECComputation() {}
void compute(MessageIterator<HLLCounter> const& messages) override {
ECValue* value = mutableVertexData();
if (globalSuperstep() == 0) {
value->counter.addNode(pregelId());
}
int32_t seenCountBefore = value->counter.getCount();
for (HLLCounter const* inCounter : messages) {
value->counter.merge(*inCounter);
}
int32_t seenCountAfter = value->counter.getCount();
if ((seenCountBefore != seenCountAfter) || (globalSuperstep() == 0)) {
sendMessageToAllEdges(value->counter);
}
// determine last iteration for which we set a value,
// we need to copy this to all iterations up to this one
// because the number of reachable vertices stays the same
// when the compute method is not invoked
if (value->shortestPaths.size() < globalSuperstep()) {
uint32_t i = value->shortestPaths.size();
int32_t numReachable = value->shortestPaths.back();
for (; i < globalSuperstep(); i++) {
value->shortestPaths.push_back(numReachable);
}
}
// subtract 1 because our own bit is counted as well
if (value->shortestPaths.size() > globalSuperstep()) {
value->shortestPaths[globalSuperstep()] = seenCountAfter - 1;
} else {
value->shortestPaths.push_back(seenCountAfter - 1);
}
voteHalt();
}
};
VertexComputation<ECValue, int32_t, HLLCounter>*
EffectiveCloseness::createComputation(WorkerConfig const*) const {
return new ECComputation();
}
struct ECGraphFormat : public GraphFormat<ECValue, int32_t> {
const std::string _resultField;
ECGraphFormat(std::string const& result) : _resultField(result) {}
size_t estimatedEdgeSize() const override { return 0; };
size_t copyVertexData(std::string const& documentId,
arangodb::velocypack::Slice document,
ECValue* targetPtr, size_t maxSize) override {
return sizeof(ECValue);
}
size_t copyEdgeData(arangodb::velocypack::Slice document, int32_t* targetPtr,
size_t maxSize) override {
return 0;
}
bool buildVertexDocument(arangodb::velocypack::Builder& b, const ECValue* ptr,
size_t size) const override {
int32_t numVerticesReachable = 0;
int32_t sumLengths = 0;
for (size_t i = 1; i < ptr->shortestPaths.size(); i++) {
int32_t newlyReachable =
ptr->shortestPaths[i] - ptr->shortestPaths[i - 1];
sumLengths += i * newlyReachable;
if (ptr->shortestPaths[i] > numVerticesReachable) {
numVerticesReachable = ptr->shortestPaths[i];
}
}
double closeness = 0.0;
if (numVerticesReachable > 0) {
closeness = (double)sumLengths / (double)numVerticesReachable;
}
b.add(_resultField, VPackValue(closeness));
return true;
}
bool buildEdgeDocument(arangodb::velocypack::Builder& b, const int32_t* ptr,
size_t size) const override {
return false;
}
};
GraphFormat<ECValue, int32_t>* EffectiveCloseness::inputFormat() const {
return new ECGraphFormat(_resultField);
}

View File

@ -0,0 +1,52 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_PREGEL_ALGOS_EFFECTIVE_CLSNESS_H
#define ARANGODB_PREGEL_ALGOS_EFFECTIVE_CLSNESS_H 1
#include "Pregel/Algorithm.h"
#include "Pregel/CommonFormats.h"
namespace arangodb {
namespace pregel {
namespace algos {
/// Effective Closeness
struct EffectiveCloseness : public SimpleAlgorithm<ECValue, int32_t, HLLCounter> {
EffectiveCloseness(VPackSlice params)
: SimpleAlgorithm<ECValue, int32_t, HLLCounter>("EffectiveCloseness", params) {}
GraphFormat<ECValue, int32_t>* inputFormat() const override;
MessageFormat<HLLCounter>* messageFormat() const override;
MessageCombiner<HLLCounter>* messageCombiner() const override;
VertexComputation<ECValue, int32_t, HLLCounter>*
createComputation(WorkerConfig const*) const override;
uint64_t maxGlobalSuperstep() const override { return 1000; }
};
}
}
}
#endif

View File

@ -0,0 +1,97 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include <cmath>
#include "Pregel/CommonFormats.h"
#include "Pregel/Graph.h"
using namespace arangodb::pregel;
// leading zeros
// https://github.com/hideo55/cpp-HyperLogLog/blob/master/include/hyperloglog.hpp
#if defined(__has_builtin) && (defined(__GNUC__) || defined(__clang__))
#define _GET_CLZ(x) ::__builtin_clz(x)
#else
inline uint8_t _get_leading_zero_count(uint32_t x) {
#if defined(_MSC_VER)
uint32_t leading_zero_len = 32;
::_BitScanReverse(&leading_zero_len, x);
--leading_zero_len;
return (uint8_t)leading_zero_len;
#else
uint8_t v = 1;
while (!(x & 0x80000000)) {
v++;
x <<= 1;
}
return v;
#endif
}
#define _GET_CLZ(x) _get_leading_zero_count(x)
#endif /* defined(__GNUC__) */
static std::hash<PregelID> _hashFn;
void HLLCounter::addNode(PregelID const& pregelId) {
size_t hash = _hashFn(pregelId);
// last 6 bits as bucket index
size_t mask = NUM_BUCKETS - 1;
size_t bucketIndex = (size_t)(hash & mask);
// throw away last 6 bits
hash >>= 6;
// make sure the 6 new zeroes don't impact estimate
hash |= 0xfc00000000000000L;
// hash has now 58 significant bits left
_buckets[bucketIndex] = (uint8_t)(_GET_CLZ(hash) + 1);
}
int32_t HLLCounter::getCount() {
int32_t m2 = NUM_BUCKETS * NUM_BUCKETS;
double sum = 0.0;
for (int i = 0; i < NUM_BUCKETS; i++) {
sum += pow(2.0, -_buckets[i]);
}
int32_t estimate = (int32_t)(ALPHA * m2 * (1.0 / sum));
if (estimate < 2.5 * NUM_BUCKETS) {
// look for empty buckets
double V = 0;
for (int i = 0; i < NUM_BUCKETS; i++) {
if (_buckets[i] == 0) {
V++;
}
}
if (V != 0) {
return (int32_t)(NUM_BUCKETS * log((double)NUM_BUCKETS / V));
}
}
return estimate;
}
void HLLCounter::merge(HLLCounter const& other) {
// take the maximum of each bucket pair
for (size_t i = 0; i < HLLCounter::NUM_BUCKETS; i++) {
if (_buckets[i] < other._buckets[i]) {
_buckets[i] = other._buckets[i];
}
}
}

View File

@ -0,0 +1,63 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_PREGEL_ALGOS_EC_HLLCOUNTER_H
#define ARANGODB_PREGEL_ALGOS_EC_HLLCOUNTER_H 1
#include "Pregel/Graph.h"
#include "Pregel/GraphFormat.h"
#include "Pregel/MessageFormat.h"
#include "Pregel/MessageCombiner.h"
#include "Pregel/CommonFormats.h"
namespace arangodb {
namespace pregel {
struct HLLCounterFormat : public MessageFormat<HLLCounter> {
HLLCounterFormat() {}
void unwrapValue(VPackSlice s, HLLCounter& senderVal) const override {
VPackArrayIterator array(s);
for (size_t i = 0; i < HLLCounter::NUM_BUCKETS; i++) {
senderVal._buckets[i] = (uint8_t) (*array).getUInt();
++array;
}
}
void addValue(VPackBuilder& arrayBuilder,
HLLCounter const& senderVal) const override {
// TODO fucking pack 8-bytes into one 64 bit entry
arrayBuilder.openArray();
for (size_t i = 0; i < HLLCounter::NUM_BUCKETS; i++) {
arrayBuilder.add(VPackValue(senderVal._buckets[i]));
}
arrayBuilder.close();
}
};
struct HLLCounterCombiner : MessageCombiner<HLLCounter> {
void combine(HLLCounter& firstValue, HLLCounter const& secondValue) const override {
firstValue.merge(secondValue);
};
};
}
}
#endif

View File

@ -42,7 +42,7 @@ using namespace arangodb::pregel::algos;
static std::string const kMoreIterations = "more";
static const double RESTART_PROB = 0.15;
static const double EPS = 0.000001;
static const double EPS = 0.0000001;
LineRank::LineRank(arangodb::velocypack::Slice params)
: SimpleAlgorithm("LineRank", params) {

View File

@ -34,6 +34,29 @@
namespace arangodb {
namespace pregel {
/// A counter for counting unique vertex IDs using a HyperLogLog sketch.
/// @author Aljoscha Krettek, Robert Metzger, Robert Waury
struct HLLCounter {
friend struct HLLCounterFormat;
constexpr static int32_t NUM_BUCKETS = 64;
constexpr static double ALPHA = 0.709;
int32_t getCount();
void addNode(PregelID const& pregelId);
void merge(HLLCounter const& counter);
private:
uint8_t _buckets[NUM_BUCKETS] = {0};
};
/// Effective closeness value
struct ECValue {
HLLCounter counter;
std::vector<int32_t> shortestPaths;
};
struct SCCValue {
std::vector<PregelID> parents;
uint64_t vertexID;

View File

@ -65,8 +65,7 @@ Conductor::Conductor(
Conductor::~Conductor() { this->cancel(); }
void Conductor::start(std::string const& algoName,
VPackSlice const& config) {
void Conductor::start(std::string const& algoName, VPackSlice const& config) {
if (!config.isObject()) {
_userParams.openObject();
_userParams.close();
@ -85,7 +84,8 @@ void Conductor::start(std::string const& algoName,
_masterContext.reset(_algorithm->masterContext(config));
_aggregators.reset(new AggregatorHandler(_algorithm.get()));
_maxSuperstep = VelocyPackHelper::getNumericValue(config, "maxGSS", _maxSuperstep);
_maxSuperstep =
VelocyPackHelper::getNumericValue(config, "maxGSS", _maxSuperstep);
// configure the async mode as optional
VPackSlice async = _userParams.slice().get("async");
_asyncMode = _algorithm->supportsAsyncMode();
@ -130,7 +130,8 @@ bool Conductor::_startGlobalStep() {
int res = _sendToAllDBServers(Utils::prepareGSSPath, b.slice(), requests);
if (res != TRI_ERROR_NO_ERROR) {
_state = ExecutionState::IN_ERROR;
LOG_TOPIC(INFO, Logger::PREGEL) << "Seems there is at least one worker out of order";
LOG_TOPIC(INFO, Logger::PREGEL)
<< "Seems there is at least one worker out of order";
Utils::printResponses(requests);
// the recovery mechanisms should take care of this
return false;
@ -168,7 +169,7 @@ bool Conductor::_startGlobalStep() {
// tells workers to store / discard results
if (_storeResults) {
_finalizeWorkers();
} else {// just stop the timer
} else { // just stop the timer
_endTimeSecs = TRI_microtime();
LOG_TOPIC(INFO, Logger::PREGEL) << "Done execution took"
<< totalRuntimeSecs() << " s";
@ -196,10 +197,12 @@ bool Conductor::_startGlobalStep() {
res = _sendToAllDBServers(Utils::startGSSPath, b.slice()); // call me maybe
if (res != TRI_ERROR_NO_ERROR) {
_state = ExecutionState::IN_ERROR;
LOG_TOPIC(INFO, Logger::PREGEL) << "Conductor could not start GSS " << _globalSuperstep;
LOG_TOPIC(INFO, Logger::PREGEL) << "Conductor could not start GSS "
<< _globalSuperstep;
// the recovery mechanisms should take care od this
} else {
LOG_TOPIC(INFO, Logger::PREGEL) << "Conductor started new gss " << _globalSuperstep;
LOG_TOPIC(INFO, Logger::PREGEL) << "Conductor started new gss "
<< _globalSuperstep;
}
return res == TRI_ERROR_NO_ERROR;
}
@ -209,7 +212,8 @@ void Conductor::finishedWorkerStartup(VPackSlice const& data) {
MUTEX_LOCKER(guard, _callbackMutex);
_ensureUniqueResponse(data);
if (_state != ExecutionState::RUNNING) {
LOG_TOPIC(WARN, Logger::PREGEL) << "We are not in a state where we expect a response";
LOG_TOPIC(WARN, Logger::PREGEL)
<< "We are not in a state where we expect a response";
return;
}
@ -219,8 +223,8 @@ void Conductor::finishedWorkerStartup(VPackSlice const& data) {
return;
}
LOG_TOPIC(INFO, Logger::PREGEL) << _totalVerticesCount << " vertices, " << _totalEdgesCount
<< " edges";
LOG_TOPIC(INFO, Logger::PREGEL) << _totalVerticesCount << " vertices, "
<< _totalEdgesCount << " edges";
if (_masterContext) {
_masterContext->_globalSuperstep = 0;
_masterContext->_vertexCount = _totalVerticesCount;
@ -250,7 +254,8 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) {
if (gss != _globalSuperstep ||
!(_state == ExecutionState::RUNNING ||
_state == ExecutionState::CANCELED)) {
LOG_TOPIC(WARN, Logger::PREGEL) << "Conductor did received a callback from the wrong superstep";
LOG_TOPIC(WARN, Logger::PREGEL)
<< "Conductor did received a callback from the wrong superstep";
return VPackBuilder();
}
@ -280,8 +285,9 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) {
return response;
}
LOG_TOPIC(INFO, Logger::PREGEL) << "Finished gss " << _globalSuperstep << " in "
<< (TRI_microtime() - _computationStartTimeSecs) << "s";
LOG_TOPIC(INFO, Logger::PREGEL)
<< "Finished gss " << _globalSuperstep << " in "
<< (TRI_microtime() - _computationStartTimeSecs) << "s";
_globalSuperstep++;
// don't block the response for workers waiting on this callback
@ -293,10 +299,12 @@ VPackBuilder Conductor::finishedWorkerStep(VPackSlice const& data) {
if (_state == ExecutionState::RUNNING) {
_startGlobalStep(); // trigger next superstep
} else if (_state == ExecutionState::CANCELED) {
LOG_TOPIC(WARN, Logger::PREGEL) << "Execution was canceled, results will be discarded.";
LOG_TOPIC(WARN, Logger::PREGEL)
<< "Execution was canceled, results will be discarded.";
_finalizeWorkers(); // tells workers to store / discard results
} else { // this prop shouldn't occur unless we are recovering or in error
LOG_TOPIC(WARN, Logger::PREGEL) << "No further action taken after receiving all responses";
LOG_TOPIC(WARN, Logger::PREGEL)
<< "No further action taken after receiving all responses";
}
});
return VPackBuilder();
@ -306,7 +314,8 @@ void Conductor::finishedRecoveryStep(VPackSlice const& data) {
MUTEX_LOCKER(guard, _callbackMutex);
_ensureUniqueResponse(data);
if (_state != ExecutionState::RECOVERING) {
LOG_TOPIC(WARN, Logger::PREGEL) << "We are not in a state where we expect a recovery response";
LOG_TOPIC(WARN, Logger::PREGEL)
<< "We are not in a state where we expect a recovery response";
return;
}
@ -591,12 +600,14 @@ int Conductor::_finalizeWorkers() {
_aggregators->serializeValues(b);
b.close();
LOG_TOPIC(INFO, Logger::PREGEL) << "Done. We did " << _globalSuperstep << " rounds";
LOG_TOPIC(INFO, Logger::PREGEL) << "Startup Time: " << _computationStartTimeSecs - _startTimeSecs
<< "s";
LOG_TOPIC(INFO, Logger::PREGEL) << "Computation Time: " << compEnd - _computationStartTimeSecs
<< "s";
LOG_TOPIC(INFO, Logger::PREGEL) << "Storage Time: " << TRI_microtime() - compEnd << "s";
LOG_TOPIC(INFO, Logger::PREGEL) << "Done. We did " << _globalSuperstep
<< " rounds";
LOG_TOPIC(INFO, Logger::PREGEL)
<< "Startup Time: " << _computationStartTimeSecs - _startTimeSecs << "s";
LOG_TOPIC(INFO, Logger::PREGEL)
<< "Computation Time: " << compEnd - _computationStartTimeSecs << "s";
LOG_TOPIC(INFO, Logger::PREGEL)
<< "Storage Time: " << TRI_microtime() - compEnd << "s";
LOG_TOPIC(INFO, Logger::PREGEL) << "Overall: " << totalRuntimeSecs() << "s";
LOG_TOPIC(INFO, Logger::PREGEL) << "Stats: " << b.toString();
return res;
@ -606,7 +617,7 @@ VPackBuilder Conductor::collectAQLResults() {
if (_state != ExecutionState::DONE) {
return VPackBuilder();
}
VPackBuilder b;
b.openObject();
b.add(Utils::executionNumberKey, VPackValue(_executionNumber));
@ -616,7 +627,7 @@ VPackBuilder Conductor::collectAQLResults() {
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
VPackBuilder messages;
for (auto const& req : requests) {
VPackSlice payload = req.result.answer->payload();
@ -654,7 +665,8 @@ int Conductor::_sendToAllDBServers(std::string const& suffix,
size_t nrDone = 0;
size_t nrGood = cc->performRequests(requests, 5.0 * 60.0, nrDone,
LogTopic("Pregel Conductor"));
LOG_TOPIC(INFO, Logger::PREGEL) << "Send " << suffix << " to " << nrDone << " servers";
LOG_TOPIC(INFO, Logger::PREGEL) << "Send " << suffix << " to " << nrDone
<< " servers";
Utils::printResponses(requests);
return nrGood == requests.size() ? TRI_ERROR_NO_ERROR : TRI_ERROR_FAILED;
@ -664,7 +676,8 @@ void Conductor::_ensureUniqueResponse(VPackSlice body) {
// check if this the only time we received this
ServerID sender = body.get(Utils::senderKey).copyString();
if (_respondedServers.find(sender) != _respondedServers.end()) {
LOG_TOPIC(ERR, Logger::PREGEL) << "Received response already from " << sender;
LOG_TOPIC(ERR, Logger::PREGEL) << "Received response already from "
<< sender;
THROW_ARANGO_EXCEPTION(TRI_ERROR_ARANGO_CONFLICT);
}
_respondedServers.insert(sender);

View File

@ -23,6 +23,10 @@
#ifndef ARANGODB_PREGEL_GRAPH_STRUCTURE_H
#define ARANGODB_PREGEL_GRAPH_STRUCTURE_H 1
#include <cstdint>
#include <string>
#include <functional>
namespace arangodb {
namespace pregel {
@ -157,11 +161,11 @@ class VertexEntry {
};*/
}
}
/*
namespace std {
template <>
struct hash<arangodb::pregel::PregelID> {
std::size_t operator()(const PregelID& k) const {
std::size_t operator()(const arangodb::pregel::PregelID& k) const {
using std::size_t;
using std::hash;
using std::string;
@ -170,10 +174,10 @@ namespace std {
// second and third and combine them using XOR
// and bit shifting:
std::size_t h1 = std::hash<string>()(k.key);
std::size_t h2 = std::hash<prgl_shard_t>()(k.shard);
std::size_t h2 = std::hash<int32_t>()(k.shard);
return h1 ^ (h2 << 1);
}
};
}*/
}
#endif

View File

@ -90,10 +90,12 @@ std::map<ShardID, uint64_t> GraphStore<V, E>::_allocateMemory() {
}
_edges.resize(count);
if (countTrx->commit() != TRI_ERROR_NO_ERROR) {
LOG_TOPIC(WARN, Logger::PREGEL) << "Pregel worker: Failed to commit on a read transaction";
LOG_TOPIC(WARN, Logger::PREGEL)
<< "Pregel worker: Failed to commit on a read transaction";
}
LOG_TOPIC(INFO, Logger::PREGEL) << "Allocating memory took " << TRI_microtime() - t << "s";
LOG_TOPIC(INFO, Logger::PREGEL) << "Allocating memory took "
<< TRI_microtime() - t << "s";
return shardSizes;
}
@ -101,7 +103,7 @@ template <typename V, typename E>
void GraphStore<V, E>::loadShards(WorkerConfig* config,
std::function<void()> callback) {
_config = config;
std::map<ShardID, uint64_t> shardSizes (_allocateMemory());
std::map<ShardID, uint64_t> shardSizes(_allocateMemory());
ThreadPool* pool = PregelFeature::instance()->threadPool();
uint64_t vertexOffset = 0, edgeOffset = 0;
@ -112,7 +114,8 @@ void GraphStore<V, E>::loadShards(WorkerConfig* config,
_config->edgeCollectionShards();
_runningThreads = config->localVertexShardIDs().size();
LOG_TOPIC(INFO, Logger::PREGEL) << "Using " << _runningThreads << " threads to load data";
LOG_TOPIC(INFO, Logger::PREGEL) << "Using " << _runningThreads
<< " threads to load data";
for (auto const& pair : vertexCollMap) {
std::vector<ShardID> const& vertexShards = pair.second;
for (size_t i = 0; i < vertexShards.size(); i++) {
@ -227,7 +230,8 @@ void GraphStore<V, E>::loadDocument(WorkerConfig* config,
}
}
if (trx->commit() != TRI_ERROR_NO_ERROR) {
LOG_TOPIC(WARN, Logger::PREGEL) << "Pregel worker: Failed to commit on a read transaction";
LOG_TOPIC(WARN, Logger::PREGEL)
<< "Pregel worker: Failed to commit on a read transaction";
}
}
@ -253,7 +257,8 @@ void GraphStore<V, E>::replaceVertexData(VertexEntry const* entry, void* data,
// if (size <= entry->_vertexDataOffset)
void* ptr = _vertexData.data() + entry->_vertexDataOffset;
memcpy(ptr, data, size);
LOG_TOPIC(WARN, Logger::PREGEL) << "Don't use this function with varying sizes";
LOG_TOPIC(WARN, Logger::PREGEL)
<< "Don't use this function with varying sizes";
}
template <typename V, typename E>
@ -301,19 +306,19 @@ void GraphStore<V, E>::_loadVertices(ShardID const& vertexShard,
LogicalCollection* collection = cursor->collection();
uint64_t number = collection->numberDocuments();
_graphFormat->willLoadVertices(number);
auto cb = [&] (DocumentIdentifierToken const& token) {
auto cb = [&](DocumentIdentifierToken const& token) {
if (collection->readDocument(trx.get(), mmdr, token)) {
VPackSlice document(mmdr.vpack());
if (document.isExternal()) {
document = document.resolveExternal();
}
VertexEntry& ventry = _index[vertexOffset];
ventry._shard = sourceShard;
ventry._key = document.get(StaticStrings::KeyString).copyString();
ventry._edgeDataOffset = edgeOffset;
// load vertex data
std::string documentId = trx->extractIdString(document);
if (_graphFormat->estimatedVertexSize() > 0) {
@ -336,7 +341,8 @@ void GraphStore<V, E>::_loadVertices(ShardID const& vertexShard,
_localVerticeCount += (vertexOffset - originalVertexOffset);
if (trx->commit() != TRI_ERROR_NO_ERROR) {
LOG_TOPIC(WARN, Logger::PREGEL) << "Pregel worker: Failed to commit on a read transaction";
LOG_TOPIC(WARN, Logger::PREGEL)
<< "Pregel worker: Failed to commit on a read transaction";
}
}
@ -359,19 +365,19 @@ void GraphStore<V, E>::_loadEdges(Transaction* trx, ShardID const& edgeShard,
}
LogicalCollection* collection = cursor->collection();
auto cb = [&] (DocumentIdentifierToken const& token) {
auto cb = [&](DocumentIdentifierToken const& token) {
if (collection->readDocument(trx, mmdr, token)) {
VPackSlice document(mmdr.vpack());
if (document.isExternal()) {
document = document.resolveExternal();
}
// LOG_TOPIC(INFO, Logger::PREGEL) << "Loaded Edge: " << document.toJson();
std::string toValue =
document.get(StaticStrings::ToString).copyString();
// LOG_TOPIC(INFO, Logger::PREGEL) << "Loaded Edge: " <<
// document.toJson();
std::string toValue = document.get(StaticStrings::ToString).copyString();
std::size_t pos = toValue.find('/');
std::string collectionName = toValue.substr(0, pos);
// If this is called from loadDocument we didn't preallocate the vector
if (_edges.size() <= offset) {
if (!_config->lazyLoading()) {
@ -381,10 +387,9 @@ void GraphStore<V, E>::_loadEdges(Transaction* trx, ShardID const& edgeShard,
}
Edge<E>& edge(_edges[offset]);
edge._toKey = toValue.substr(pos + 1, toValue.length() - pos - 1);
auto collInfo =
Utils::resolveCollection(_config->database(), collectionName,
_config->collectionPlanIdMap());
auto collInfo = Utils::resolveCollection(
_config->database(), collectionName, _config->collectionPlanIdMap());
if (collInfo) {
// resolve the shard of the target vertex.
ShardID responsibleShard;
@ -399,12 +404,11 @@ void GraphStore<V, E>::_loadEdges(Transaction* trx, ShardID const& edgeShard,
}
offset++;
}
}
};
while (cursor->getMore(cb, 1000)) {
}
// Add up all added elements
size_t added = offset - originalOffset;
vertexEntry._edgeCount += added;
@ -415,12 +419,12 @@ void GraphStore<V, E>::_loadEdges(Transaction* trx, ShardID const& edgeShard,
/// Should not dead-lock unless we have to wait really long for other threads
template <typename V, typename E>
void GraphStore<V, E>::_storeVertices(std::vector<ShardID> const& globalShards,
RangeIterator<VertexEntry> &it) {
RangeIterator<VertexEntry>& it) {
// transaction on one shard
std::unique_ptr<ExplicitTransaction> trx;
prgl_shard_t currentShard = (prgl_shard_t)-1;
int res = TRI_ERROR_NO_ERROR;
// loop over vertices
while (it != it.end()) {
if (it->shard() != currentShard) {
@ -432,11 +436,9 @@ void GraphStore<V, E>::_storeVertices(std::vector<ShardID> const& globalShards,
}
currentShard = it->shard();
ShardID const& shard = globalShards[currentShard];
trx.reset(
new ExplicitTransaction(
StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()), {},
{shard}, {}, Transaction::DefaultLockTimeout, false, false)
);
trx.reset(new ExplicitTransaction(
StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()), {},
{shard}, {}, Transaction::DefaultLockTimeout, false, false));
res = trx->begin();
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
@ -483,12 +485,13 @@ template <typename V, typename E>
void GraphStore<V, E>::storeResults(WorkerConfig const& state) {
double now = TRI_microtime();
//for (auto const& shard : state.localEdgeShardIDs()) {
// for (auto const& shard : state.localEdgeShardIDs()) {
// writeColls.push_back(shard);
//}
std::atomic<size_t> tCount(0);
size_t total = _index.size();
size_t delta = std::max(10UL, _index.size() / state.localVertexShardIDs().size());
size_t delta =
std::max(10UL, _index.size() / state.localVertexShardIDs().size());
size_t start = 0, end = delta;
do {
@ -498,21 +501,23 @@ void GraphStore<V, E>::storeResults(WorkerConfig const& state) {
try {
RangeIterator<VertexEntry> it = vertexIterator(start, end);
_storeVertices(state.globalShardIDs(), it);
} catch(...) {
} catch (...) {
LOG_TOPIC(ERR, Logger::PREGEL) << "Storing vertex data failed";
}
tCount--;
});
start = end; end = end + delta;
start = end;
end = end + delta;
if (total < end + delta) { // swallow the rest
end = total;
}
} while (start != end);
while (tCount > 0 && !_destroyed) {
usleep(25 * 1000);// 25ms
usleep(25 * 1000); // 25ms
}
LOG_TOPIC(INFO, Logger::PREGEL) << "Storing data took " << (TRI_microtime() - now) << "s";
LOG_TOPIC(INFO, Logger::PREGEL) << "Storing data took "
<< (TRI_microtime() - now) << "s";
}
template class arangodb::pregel::GraphStore<int64_t, int64_t>;
@ -522,3 +527,4 @@ template class arangodb::pregel::GraphStore<double, double>;
// specific algo combos
template class arangodb::pregel::GraphStore<SCCValue, int32_t>;
template class arangodb::pregel::GraphStore<ECValue, int32_t>;

View File

@ -121,30 +121,31 @@ void ArrayInCache<M>::_set(prgl_shard_t shard, std::string const& key,
}
template <typename M>
void ArrayInCache<M>::mergeCache(WorkerConfig const& config, InCache<M> const* otherCache) {
void ArrayInCache<M>::mergeCache(WorkerConfig const& config,
InCache<M> const* otherCache) {
ArrayInCache<M>* other = (ArrayInCache<M>*)otherCache;
this->_containedMessageCount += other->_containedMessageCount;
// ranomize access to buckets, don't wait for the lock
std::set<prgl_shard_t> const& shardIDs = config.localPregelShardIDs();
std::vector<prgl_shard_t> randomized(shardIDs.begin(), shardIDs.end());
std::random_shuffle(randomized.begin(), randomized.end());
size_t i = 0;
do {
i = (i + 1) % randomized.size();
prgl_shard_t shardId = randomized[i];
auto const& it = other->_shardMap.find(shardId);
if (it != other->_shardMap.end() && it->second.size() > 0) {
TRY_MUTEX_LOCKER(guard, this->_bucketLocker[shardId]);
if (guard.isLocked() == false) {
if (i == 0) {// eventually we hit the last one
usleep(100);// don't busy wait
if (i == 0) { // eventually we hit the last one
usleep(100); // don't busy wait
}
continue;
}
// only access bucket after we aquired the lock
HMap& myVertexMap = _shardMap[shardId];
for (auto& vertexMessage : it->second) {
@ -153,7 +154,7 @@ void ArrayInCache<M>::mergeCache(WorkerConfig const& config, InCache<M> const* o
a.insert(a.end(), b.begin(), b.end());
}
}
randomized.erase(randomized.begin() + i);
} while (randomized.size() > 0);
}
@ -176,7 +177,7 @@ MessageIterator<M> ArrayInCache<M>::getMessages(prgl_shard_t shard,
template <typename M>
void ArrayInCache<M>::clear() {
for (auto& pair : _shardMap) {
//MUTEX_LOCKER(guard, this->_bucketLocker[pair.first]);
// MUTEX_LOCKER(guard, this->_bucketLocker[pair.first]);
pair.second.clear();
}
this->_containedMessageCount = 0;
@ -241,23 +242,23 @@ void CombiningInCache<M>::mergeCache(WorkerConfig const& config,
InCache<M> const* otherCache) {
CombiningInCache<M>* other = (CombiningInCache<M>*)otherCache;
this->_containedMessageCount += other->_containedMessageCount;
// ranomize access to buckets, don't wait for the lock
std::set<prgl_shard_t> const& shardIDs = config.localPregelShardIDs();
std::vector<prgl_shard_t> randomized(shardIDs.begin(), shardIDs.end());
std::random_shuffle(randomized.begin(), randomized.end());
size_t i = 0;
do {
i = (i + 1) % randomized.size();
prgl_shard_t shardId = randomized[i];
auto const& it = other->_shardMap.find(shardId);
if (it != other->_shardMap.end() && it->second.size() > 0) {
TRY_MUTEX_LOCKER(guard, this->_bucketLocker[shardId]);
if (guard.isLocked() == false) {
if (i == 0) {// eventually we hit the last one
usleep(100);// don't busy wait
if (i == 0) { // eventually we hit the last one
usleep(100); // don't busy wait
}
continue;
}
@ -273,7 +274,7 @@ void CombiningInCache<M>::mergeCache(WorkerConfig const& config,
}
}
}
randomized.erase(randomized.begin() + i);
} while (randomized.size() > 0);
}
@ -338,3 +339,6 @@ template class arangodb::pregel::CombiningInCache<double>;
template class arangodb::pregel::InCache<SenderMessage<uint64_t>>;
template class arangodb::pregel::ArrayInCache<SenderMessage<uint64_t>>;
template class arangodb::pregel::CombiningInCache<SenderMessage<uint64_t>>;
template class arangodb::pregel::InCache<HLLCounter>;
template class arangodb::pregel::ArrayInCache<HLLCounter>;
template class arangodb::pregel::CombiningInCache<HLLCounter>;

View File

@ -81,7 +81,8 @@ void ArrayOutCache<M>::appendMessage(prgl_shard_t shard, std::string const& key,
template <typename M>
void ArrayOutCache<M>::flushMessages() {
// LOG_TOPIC(INFO, Logger::PREGEL) << "Beginning to send messages to other machines";
// LOG_TOPIC(INFO, Logger::PREGEL) << "Beginning to send messages to other
// machines";
uint64_t gss = this->_config->globalSuperstep();
if (this->_sendToNextGSS) {
gss += 1;
@ -128,7 +129,8 @@ void ArrayOutCache<M>::flushMessages() {
requests.emplace_back("shard:" + shardId, rest::RequestType::POST,
this->_baseUrl + Utils::messagesPath, body);
// LOG_TOPIC(INFO, Logger::PREGEL) << "Worker: Sending data to other Shard: " << shardId;
// LOG_TOPIC(INFO, Logger::PREGEL) << "Worker: Sending data to other Shard:
// " << shardId;
//<< ". Message: " << package.toJson();
}
size_t nrDone = 0;
@ -178,7 +180,7 @@ void CombiningOutCache<M>::appendMessage(prgl_shard_t shard,
vertexMap.emplace(key, data);
if (++(this->_containedMessages) >= this->_batchSize) {
//LOG_TOPIC(INFO, Logger::PREGEL) << "Hit buffer limit";
// LOG_TOPIC(INFO, Logger::PREGEL) << "Hit buffer limit";
flushMessages();
}
}
@ -187,7 +189,8 @@ void CombiningOutCache<M>::appendMessage(prgl_shard_t shard,
template <typename M>
void CombiningOutCache<M>::flushMessages() {
// LOG_TOPIC(INFO, Logger::PREGEL) << "Beginning to send messages to other machines";
// LOG_TOPIC(INFO, Logger::PREGEL) << "Beginning to send messages to other
// machines";
uint64_t gss = this->_config->globalSuperstep();
if (this->_sendToNextGSS && this->_config->asynchronousMode()) {
gss += 1;
@ -230,7 +233,8 @@ void CombiningOutCache<M>::flushMessages() {
requests.emplace_back("shard:" + shardId, rest::RequestType::POST,
this->_baseUrl + Utils::messagesPath, body);
// LOG_TOPIC(INFO, Logger::PREGEL) << "Worker: Sending data to other Shard: " << shardId;
// LOG_TOPIC(INFO, Logger::PREGEL) << "Worker: Sending data to other Shard:
// " << shardId;
// << ". Message: " << package.toJson();
}
size_t nrDone = 0;
@ -256,3 +260,6 @@ template class arangodb::pregel::CombiningOutCache<double>;
template class arangodb::pregel::OutCache<SenderMessage<uint64_t>>;
template class arangodb::pregel::ArrayOutCache<SenderMessage<uint64_t>>;
template class arangodb::pregel::CombiningOutCache<SenderMessage<uint64_t>>;
template class arangodb::pregel::OutCache<HLLCounter>;
template class arangodb::pregel::ArrayOutCache<HLLCounter>;
template class arangodb::pregel::CombiningOutCache<HLLCounter>;

View File

@ -63,7 +63,8 @@ size_t PregelFeature::availableParallelism() {
const size_t procNum = TRI_numberProcessors();
if (procNum <= 1)
return 1;
else return procNum;// use full performance on cluster
else
return procNum; // use full performance on cluster
}
void PregelFeature::start() {

View File

@ -158,7 +158,8 @@ int RecoveryManager::filterGoodServers(std::vector<ServerID> const& servers,
result.slice()[0].get(std::vector<std::string>(
{AgencyCommManager::path(), "Supervision", "Health"}));
LOG_TOPIC(INFO, Logger::PREGEL) << "Server Status: " << serversRegistered.toJson();
LOG_TOPIC(INFO, Logger::PREGEL) << "Server Status: "
<< serversRegistered.toJson();
if (serversRegistered.isObject()) {
for (auto const& res : VPackObjectIterator(serversRegistered)) {

View File

@ -89,8 +89,9 @@ void Utils::printResponses(std::vector<ClusterCommRequest> const& requests) {
auto& res = req.result;
if (res.status == CL_COMM_RECEIVED &&
res.answer_code != rest::ResponseCode::OK) {
LOG_TOPIC(ERR, Logger::PREGEL) << "Error sending request to " << req.destination
<< ". Payload: " << res.answer->payload().toJson();
LOG_TOPIC(ERR, Logger::PREGEL)
<< "Error sending request to " << req.destination
<< ". Payload: " << res.answer->payload().toJson();
}
}
}
@ -103,7 +104,8 @@ std::shared_ptr<LogicalCollection> Utils::resolveCollection(
if (it != collectionPlanIdMap.end()) {
return ci->getCollection(database, it->second);
}
LOG_TOPIC(ERR, Logger::PREGEL) << "The collection could not be translated to a planID";
LOG_TOPIC(ERR, Logger::PREGEL)
<< "The collection could not be translated to a planID";
// THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
// "The collection could not be translated to a
// planID");

View File

@ -50,10 +50,10 @@ using namespace arangodb::pregel;
#ifdef TRI_SHOW_LOCK_TIME
#define MY_READ_LOCKER(obj, lock) \
ReadLocker<ReadWriteLock> obj(&lock, __FILE__, __LINE__)
ReadLocker<ReadWriteLock> obj(&lock, __FILE__, __LINE__)
#define MY_WRITE_LOCKER(obj, lock) \
WriteLocker<ReadWriteLock> obj(&lock, __FILE__, __LINE__)
WriteLocker<ReadWriteLock> obj(&lock, __FILE__, __LINE__)
#else
@ -63,8 +63,6 @@ WriteLocker<ReadWriteLock> obj(&lock, __FILE__, __LINE__)
#endif
template <typename V, typename E, typename M>
Worker<V, E, M>::Worker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo,
VPackSlice initConfig)
@ -140,7 +138,6 @@ Worker<V, E, M>::~Worker() {
_writeCache = nullptr;
}
template <typename V, typename E, typename M>
void Worker<V, E, M>::_initializeMessageCaches() {
const size_t p = _config.parallelism();
@ -151,14 +148,14 @@ void Worker<V, E, M>::_initializeMessageCaches() {
_messageCombiner.get());
if (_config.asynchronousMode()) {
_writeCacheNextGSS = new CombiningInCache<M>(
&_config, _messageFormat.get(), _messageCombiner.get());
&_config, _messageFormat.get(), _messageCombiner.get());
}
for (size_t i = 0; i < p; i++) {
auto incoming = std::make_unique<CombiningInCache<M>>(nullptr, _messageFormat.get(),
_messageCombiner.get());
auto incoming = std::make_unique<CombiningInCache<M>>(
nullptr, _messageFormat.get(), _messageCombiner.get());
_inCaches.push_back(incoming.get());
_outCaches.push_back(
new CombiningOutCache<M>(&_config, incoming.release(), _writeCacheNextGSS));
_outCaches.push_back(new CombiningOutCache<M>(
&_config, incoming.release(), _writeCacheNextGSS));
}
} else {
_readCache = new ArrayInCache<M>(&_config, _messageFormat.get());
@ -169,7 +166,7 @@ void Worker<V, E, M>::_initializeMessageCaches() {
for (size_t i = 0; i < p; i++) {
_inCaches.push_back(new ArrayInCache<M>(nullptr, _messageFormat.get()));
_outCaches.push_back(
new ArrayOutCache<M>(&_config, _inCaches.back(), _writeCacheNextGSS));
new ArrayOutCache<M>(&_config, _inCaches.back(), _writeCacheNextGSS));
}
}
}
@ -181,7 +178,8 @@ VPackBuilder Worker<V, E, M>::prepareGlobalStep(VPackSlice const& data) {
MUTEX_LOCKER(guard, _commandMutex);
if (_state != WorkerState::IDLE) {
LOG_TOPIC(ERR, Logger::PREGEL) << "Expected GSS " << _expectedGSS;
LOG_TOPIC(ERR, Logger::PREGEL) << "Cannot prepare a gss when the worker is not idle";
LOG_TOPIC(ERR, Logger::PREGEL)
<< "Cannot prepare a gss when the worker is not idle";
THROW_ARANGO_EXCEPTION(TRI_ERROR_INTERNAL);
}
_state = WorkerState::PREPARING; // stop any running step
@ -212,7 +210,7 @@ VPackBuilder Worker<V, E, M>::prepareGlobalStep(VPackSlice const& data) {
_config._globalSuperstep = gss;
// write cache becomes the readable cache
if (_config.asynchronousMode()) {
MY_WRITE_LOCKER(wguard, _cacheRWLock);// by design shouldn't be necessary
MY_WRITE_LOCKER(wguard, _cacheRWLock); // by design shouldn't be necessary
TRI_ASSERT(_readCache->containedMessageCount() == 0);
TRI_ASSERT(_writeCache->containedMessageCount() == 0);
std::swap(_readCache, _writeCacheNextGSS);
@ -273,7 +271,8 @@ void Worker<V, E, M>::receivedMessages(VPackSlice const& data) {
} else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"Superstep out of sync");
LOG_TOPIC(ERR, Logger::PREGEL) << "Expected: " << _config._globalSuperstep << "Got: " << gss;
LOG_TOPIC(ERR, Logger::PREGEL) << "Expected: " << _config._globalSuperstep
<< "Got: " << gss;
}
}
@ -343,7 +342,8 @@ void Worker<V, E, M>::_startProcessing() {
_finishedProcessing(); // last thread turns the lights out
}
});
start = end; end = end + delta;
start = end;
end = end + delta;
if (total < end + delta) { // swallow the rest
end = total;
}
@ -363,13 +363,13 @@ void Worker<V, E, M>::_initializeVertexContext(VertexContext<V, E, M>* ctx) {
// internally called in a WORKER THREAD!!
template <typename V, typename E, typename M>
bool Worker<V, E, M>::_processVertices(size_t threadId,
RangeIterator<VertexEntry>& vertexIterator) {
bool Worker<V, E, M>::_processVertices(
size_t threadId, RangeIterator<VertexEntry>& vertexIterator) {
double start = TRI_microtime();
// thread local caches
InCache<M> *inCache = _inCaches[threadId];
OutCache<M> *outCache = _outCaches[threadId];
InCache<M>* inCache = _inCaches[threadId];
OutCache<M>* outCache = _outCaches[threadId];
outCache->setBatchSize(_messageBatchSize);
if (_config.asynchronousMode()) {
outCache->sendToNextGSS(_requestedNextGSS);
@ -419,15 +419,15 @@ bool Worker<V, E, M>::_processVertices(size_t threadId,
_writeCache->mergeCache(_config, inCache);
// TODO ask how to implement message sending without waiting for a response
t = TRI_microtime() - t;
MessageStats stats;
stats.sendCount = outCache->sendCount();
stats.superstepRuntimeSecs = TRI_microtime() - start;
if (t > 0.005) {
LOG_TOPIC(INFO, Logger::PREGEL) << "Total " << stats.superstepRuntimeSecs << " s merge took "
<< t << " s";
LOG_TOPIC(INFO, Logger::PREGEL) << "Total " << stats.superstepRuntimeSecs
<< " s merge took " << t << " s";
}
inCache->clear();
outCache->clear();
@ -488,7 +488,8 @@ void Worker<V, E, M>::_finishedProcessing() {
if (_config.asynchronousMode()) {
// just process these vertices in the next superstep
MY_READ_LOCKER(guard, _cacheRWLock);
_writeCache->mergeCache(_config, _readCache); // compute in next superstep
_writeCache->mergeCache(_config,
_readCache); // compute in next superstep
_messageStats.sendCount += _readCache->containedMessageCount();
} else {
// TODO call _startProcessing ???
@ -499,7 +500,7 @@ void Worker<V, E, M>::_finishedProcessing() {
}
}
}
_readCache->clear(); // no need to keep old messages around
_readCache->clear(); // no need to keep old messages around
_expectedGSS = _config._globalSuperstep + 1;
_config._localSuperstep++;
// only set the state here, because _processVertices checks for it
@ -538,7 +539,7 @@ void Worker<V, E, M>::_finishedProcessing() {
VPackSlice data = result->answer->payload();
if (data.isObject()) {
proceed = true;
_conductorAggregators->aggregateValues(data);// only aggregate values
_conductorAggregators->aggregateValues(data); // only aggregate values
VPackSlice nextGSS = data.get(Utils::enterNextGSSKey);
if (nextGSS.isBool()) {
_requestedNextGSS = nextGSS.getBool();
@ -591,13 +592,12 @@ void Worker<V, E, M>::finalizeExecution(VPackSlice const& body) {
}
template <typename V, typename E, typename M>
void Worker<V, E, M>::aqlResult(VPackBuilder *b) const {
void Worker<V, E, M>::aqlResult(VPackBuilder* b) const {
MUTEX_LOCKER(guard, _commandMutex);
b->openArray();
auto it = _graphStore->vertexIterator();
for (VertexEntry const* vertexEntry : it) {
V* data = _graphStore->mutableVertexData(vertexEntry);
b->openObject();
b->add(StaticStrings::KeyString, VPackValue(vertexEntry->key()));
@ -628,7 +628,7 @@ void Worker<V, E, M>::startRecovery(VPackSlice const& data) {
_writeCacheNextGSS->clear();
}
}
VPackBuilder copy(data);
// hack to determine newly added vertices
_preRecoveryTotal = _graphStore->localVertexCount();
@ -733,3 +733,4 @@ template class arangodb::pregel::Worker<double, float, double>;
// custom algorihm types
template class arangodb::pregel::Worker<SCCValue, int32_t,
SenderMessage<uint64_t>>;
template class arangodb::pregel::Worker<ECValue, int32_t, HLLCounter>;

View File

@ -22,8 +22,8 @@
#include "Pregel/WorkerConfig.h"
#include "Pregel/Algorithm.h"
#include "Pregel/Utils.h"
#include "Pregel/PregelFeature.h"
#include "Pregel/Utils.h"
using namespace arangodb;
using namespace arangodb::basics;
@ -38,7 +38,7 @@ WorkerConfig::WorkerConfig(DatabaseID dbname, VPackSlice params)
VPackSlice collectionPlanIdMap = params.get(Utils::collectionPlanIdMapKey);
VPackSlice globalShards = params.get(Utils::globalShardListKey);
VPackSlice async = params.get(Utils::asyncModeKey);
if (!coordID.isString() || !edgeShardMap.isObject() ||
!vertexShardMap.isObject() || !execNum.isInteger() ||
!collectionPlanIdMap.isObject() || !globalShards.isArray()) {
@ -49,7 +49,7 @@ WorkerConfig::WorkerConfig(DatabaseID dbname, VPackSlice params)
_coordinatorId = coordID.copyString();
_asynchronousMode = async.getBool();
_lazyLoading = params.get(Utils::lazyLoadingKey).getBool();
VPackSlice userParams = params.get(Utils::userParametersKey);
VPackSlice parallel = userParams.get(Utils::parallelismKey);
_parallelism = PregelFeature::availableParallelism();