1
0
Fork 0

Aggregator support started

This commit is contained in:
Simon Grätzer 2016-11-06 23:20:59 +01:00
parent 64817db293
commit dc6e993038
22 changed files with 328 additions and 124 deletions

View File

@ -363,6 +363,7 @@ SET(ARANGOD_SOURCES
Wal/Slots.cpp
Wal/SynchronizerThread.cpp
Pregel/Algorithm.cpp
Pregel/Aggregator.cpp
Pregel/Conductor.cpp
Pregel/PregelFeature.cpp
Pregel/IncomingCache.cpp
@ -376,7 +377,7 @@ SET(ARANGOD_SOURCES
Pregel/MessageFormat.cpp
Pregel/ResultWriter.cpp
Pregel/Algos/SSSP.cpp
Pregel/Algos/SCC.cpp
Pregel/Algos/PageRank.cpp
${ADDITIONAL_BIN_ARANGOD_SOURCES}
)

View File

@ -23,7 +23,7 @@
#include <cstdint>
#include <string>
#include <velocypack/Iterator.h>
#include <velocypack/Value.h>
#include <velocypack/velocypack-aliases.h>
#ifndef ARANGODB_PREGEL_AGGREGATOR_H
@ -32,14 +32,16 @@
namespace arangodb {
namespace pregel {
template <class T>
template
class Aggregator {
public:
Aggregator(const Aggregator&) = delete;
Aggregator& operator=(const Aggregator&) = delete;
virtual T const& getValue() const = 0;
virtual void reduce(T const& otherValue) = 0;
virtual void const* getValue() const = 0;
virtual void reduce( void const* otherValue) = 0;
virtual void parse(VPackSlice otherValue) = 0;
virtual void serialize(VPackBuilder *b) = 0;
std::string name() { return _name; }
protected:
@ -49,17 +51,24 @@ class Aggregator {
std::string _name;
};
class MinIntegerAggregator : public Aggregator<int64_t> {
MinIntegerAggregator(std::string const& name, int64_t init)
class FloatMinAggregator : public Aggregator {
MinIntegerAggregator(std::string const& name, float init)
: Aggregator(name), _value(init) {}
int64_t const& getValue() const override { return _value; };
void reduce(int64_t const& otherValue) override {
if (otherValue < _value) _value = otherValue;
void const* getValue() const override { return &_value; };
void reduce(void const* otherValue) override {
float other = *((float*)otherValue);
if (other < _value) _value = other;
};
void parse(VPackSlice otherValue) override {
this->reduce(otherValue.getInteger());
};
void serialize(VPackBuilder *b) override {
b.add(name(), _value);
};
private:
int64_t _value;
float _value;
};
}
}

View File

@ -44,10 +44,11 @@ struct Algorithm {
public:
virtual ~Algorithm() {}
// virtual bool isFixpointAlgorithm() const {return false;}
virtual bool preserveTransactions() const { return false; }
//virtual bool preserveTransactions() const { return false; }
virtual size_t estimatedVertexSize() const { return sizeof(V); };
virtual size_t estimatedEdgeSize() const { return sizeof(E); };
virtual void aggregators(std::vector<std::unique_ptr<Aggregator>> &aggregators) {}
virtual std::shared_ptr<GraphFormat<V, E>> inputFormat() const = 0;
virtual std::shared_ptr<MessageFormat<M>> messageFormat() const = 0;

View File

@ -1,68 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_PREGEL_ALGORITHM_H
#define ARANGODB_PREGEL_ALGORITHM_H 1
#include <velocypack/Builder.h>
#include <velocypack/velocypack-aliases.h>
#include <cstdint>
#include "Basics/Common.h"
#include "GraphFormat.h"
#include "MessageCombiner.h"
#include "MessageFormat.h"
namespace arangodb {
namespace pregel {
template <typename V, typename E, typename M>
class VertexComputation;
// specify serialization, whatever
template <typename V, typename E, typename M>
struct Algorithm {
public:
virtual ~Algorithm() {}
// virtual bool isFixpointAlgorithm() const {return false;}
virtual bool preserveTransactions() const { return false; }
virtual size_t estimatedVertexSize() const { return sizeof(V); };
virtual size_t estimatedEdgeSize() const { return sizeof(E); };
virtual std::shared_ptr<GraphFormat<V, E>> inputFormat() const = 0;
virtual std::shared_ptr<MessageFormat<M>> messageFormat() const = 0;
virtual std::shared_ptr<MessageCombiner<M>> messageCombiner() const = 0;
virtual std::shared_ptr<VertexComputation<V, E, M>> createComputation()
const = 0;
std::string const& getName() const { return _name; }
protected:
Algorithm(std::string const& name) : _name(name){};
private:
std::string _name;
};
}
}
#endif

View File

@ -0,0 +1,106 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "SCC.h"
#include "Pregel/VertexComputation.h"
#include "Pregel/GraphFormat.h"
#include "Pregel/Utils.h"
#include "Vocbase/vocbase.h"
#include "Cluster/ClusterInfo.h"
#include "Utils/OperationCursor.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "Utils/Transaction.h"
using namespace arangodb;
using namespace arangodb::pregel;
using namespace arangodb::pregel::algos;
struct SCCGraphFormat : public GraphFormat<int64_t, int64_t> {
uint64_t _currentId = 0;
void willUseCollection(TRI_vocbase_t *vocbase, std::string const& shard, bool isEdgeCollection) override {
int64_t count = Utils::countDocuments(vocbase, shard);
_currentId = ClusterInfo::instance()->uniqid(count);
}
SCCGraphFormat() {}
size_t copyVertexData(VPackSlice document, void* targetPtr,
size_t maxSize) override {
*((int64_t*)targetPtr) = _currentId;
_currentId++;
return sizeof(int64_t);
}
size_t copyEdgeData(VPackSlice document, void* targetPtr,
size_t maxSize) override {
return 0;
}
int64_t readVertexData(void* ptr) override { return *((int64_t*)ptr); }
int64_t readEdgeData(void* ptr) override { return *((int64_t*)ptr); }
};
std::shared_ptr<GraphFormat<int64_t, int64_t>> SCCAlgorithm::inputFormat()
const {
return std::make_shared<IntegerGraphFormat>("value", -1, 1);
}
std::shared_ptr<MessageFormat<int64_t>> SCCAlgorithm::messageFormat() const {
return std::shared_ptr<IntegerMessageFormat>(new IntegerMessageFormat());
}
std::shared_ptr<MessageCombiner<int64_t>> SCCAlgorithm::messageCombiner()
const {
return std::shared_ptr<MinIntegerCombiner>(new MinIntegerCombiner());
}
struct SCCComputation : public VertexComputation<int64_t, int64_t, int64_t> {
SCCComputation() {}
void compute(std::string const& vertexID,
MessageIterator<int64_t> const& messages) override {
/*int64_t tmp = vertexData();
for (const int64_t* msg : messages) {
if (tmp < 0 || *msg < tmp) {
tmp = *msg;
};
}
int64_t* state = mutableVertexData();
if (tmp >= 0 && (getGlobalSuperstep() == 0 || tmp != *state)) {
LOG(INFO) << "Recomputing value for vertex " << vertexID;
*state = tmp; // update state
EdgeIterator<int64_t> edges = getEdges();
for (EdgeEntry<int64_t>* edge : edges) {
int64_t val = *(edge->getData()) + tmp;
uint64_t toID = edge->toPregelID();
sendMessage(toID, val);
}
}*/
voteHalt();
}
};
std::shared_ptr<VertexComputation<int64_t, int64_t, int64_t>>
SCCAlgorithm::createComputation() const {
return std::shared_ptr<SCCComputation>(new SCCComputation());
}

View File

@ -0,0 +1,47 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_PREGEL_ALGOS_SSSP_H
#define ARANGODB_PREGEL_ALGOS_SSSP_H 1
#include "Pregel/Algorithm.h"
namespace arangodb {
namespace pregel {
namespace algos {
/// PageRank
struct PageRankAlgorithm : public Algorithm<float, float, float> {
public:
SCCAlgorithm() : Algorithm("PageRank") {}
void aggregators(std::vector<std::unique_ptr<Aggregator>> &aggregators) override;
std::shared_ptr<GraphFormat<float, float>> inputFormat() const override;
std::shared_ptr<MessageFormat<float>> messageFormat() const override;
std::shared_ptr<MessageCombiner<float>> messageCombiner() const override;
std::shared_ptr<VertexComputation<float, float, float>>
createComputation() const override;
};
}
}
}
#endif

View File

@ -71,7 +71,7 @@ std::shared_ptr<MessageFormat<int64_t>> SCCAlgorithm::messageFormat() const {
std::shared_ptr<MessageCombiner<int64_t>> SCCAlgorithm::messageCombiner()
const {
return std::shared_ptr<MinIntegerCombiner>(new MinIntegerCombiner());
return std::shared_ptr<MinIntegerCombiner>(new IntegerMinCombiner());
}
struct SCCComputation : public VertexComputation<int64_t, int64_t, int64_t> {

View File

@ -66,7 +66,7 @@ std::shared_ptr<MessageFormat<int64_t>> SSSPAlgorithm::messageFormat() const {
std::shared_ptr<MessageCombiner<int64_t>> SSSPAlgorithm::messageCombiner()
const {
return std::shared_ptr<MinIntegerCombiner>(new MinIntegerCombiner());
return std::shared_ptr<MinIntegerCombiner>(new IntegerMinCombiner());
}
std::shared_ptr<VertexComputation<int64_t, int64_t, int64_t>>

View File

@ -0,0 +1,39 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "Pregel/MessageCombiner.h"
#ifndef ARANGODB_PREGEL_FLOAT_SUM_COMBINER_H
#define ARANGODB_PREGEL_FLOAT_SUM_COMBINER_H 1
namespace arangodb {
namespace pregel {
struct FloatSumCombiner : public MessageCombiner<float> {
MinIntegerCombiner() {}
float combine(float const& firstValue,
float const& secondValue) const override {
return firstValue + secondValue;
};
};
}
}
#endif

View File

@ -36,6 +36,9 @@
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
#include "Algorithm.h"
#include "Pregel/Algos/PageRank.h"
using namespace arangodb;
using namespace arangodb::pregel;
@ -52,6 +55,14 @@ Conductor::Conductor(
bool isCoordinator = ServerState::instance()->isCoordinator();
assert(isCoordinator);
LOG(INFO) << "constructed conductor";
if (algorithm == "sssp") {
} else if (algorithm == "pagerank") {
algos::PageRankAlgorithm algo();
algo.aggregators(_aggregators);
} else {
LOG(ERR) << "Unsupported algorithm";
}
}
Conductor::~Conductor() {
@ -71,8 +82,7 @@ static void printResults(std::vector<ClusterCommRequest> const& requests) {
}
static void resolveShards(LogicalCollection const* collection,
std::map<ServerID, std::vector<ShardID>>& serverMap,
std::map<ServerID, std::map<ShardID, std::string>> &serverShardPlanIdMap) {
std::map<ServerID, std::vector<ShardID>>& serverMap) {
ClusterInfo* ci = ClusterInfo::instance();
std::shared_ptr<std::vector<ShardID>> shardIDs =
@ -83,7 +93,6 @@ static void resolveShards(LogicalCollection const* collection,
ci->getResponsibleServer(shard);
if (servers->size() > 0) {
serverMap[(*servers)[0]].push_back(shard);
serverShardPlanIdMap[(*servers)[0]][shard] = collection->planId_as_string();
}
}
}
@ -91,23 +100,25 @@ static void resolveShards(LogicalCollection const* collection,
void Conductor::start() {
ClusterComm* cc = ClusterComm::instance();
int64_t vertexCount = 0, edgeCount = 0;
std::map<ServerID, std::map<ShardID, std::string>> serverShardPlanIdMap;
std::map<CollectionID, std::string> collectionPlanIdMap;
std::map<ServerID, std::vector<ShardID>> edgeServerMap;
for (auto collection : _vertexCollections) {
collectionPlanIdMap[collection->name()] = collection->planId_as_string();
size_t cc = Utils::countDocuments(_vocbaseGuard.vocbase(), collection->name());
if (cc > 0) {
vertexCount += cc;
resolveShards(collection.get(), _vertexServerMap, serverShardPlanIdMap);
resolveShards(collection.get(), _vertexServerMap);
} else {
LOG(WARN) << "Collection does not contain vertices";
}
}
for (auto collection : _edgeCollections) {
collectionPlanIdMap[collection->name()] = collection->planId_as_string();
size_t cc = Utils::countDocuments(_vocbaseGuard.vocbase(), collection->name());
if (cc > 0) {
edgeCount += cc;
resolveShards(collection.get(), edgeServerMap, serverShardPlanIdMap);
resolveShards(collection.get(), edgeServerMap);
} else {
LOG(WARN) << "Collection does not contain edges";
}
@ -147,9 +158,9 @@ void Conductor::start() {
b.add(VPackValue(eit));
}
b.close();
b.add(Utils::shardPlanMapKey, VPackValue(VPackValueType::Object));
for (auto const& shardPair : serverShardPlanIdMap[it.first]) {
b.add(shardPair.first, VPackValue(shardPair.second));
b.add(Utils::collectionPlanIdMapKey, VPackValue(VPackValueType::Object));
for (auto const& pair : collectionPlanIdMap) {
b.add(pair.first, VPackValue(pair.second));
}
b.close();
b.close();
@ -174,8 +185,20 @@ void Conductor::finishedGlobalStep(VPackSlice& data) {
LOG(WARN) << "Conductor did not expect another finishedGlobalStep call";
return;
}
_responseCount++;
VPackSlice aggValues = data.get(Utils::aggregatorsKey);
if (aggValues.isObject()) {
for (std::unique_ptr<Aggregator> &aggregator : _aggregators) {
VPackSlice val = aggValues.get(aggregator->name())
if (!val.isNone()) {
aggregator->parse(val);
}
}
}
VPackSlice isDone = data.get(Utils::doneKey);
if (isDone.isBool() && isDone.getBool()) {
_doneCount++;

View File

@ -55,11 +55,13 @@ class Conductor {
Mutex _finishedGSSMutex; // prevents concurrent calls to finishedGlobalStep
VocbaseGuard _vocbaseGuard;
const unsigned int _executionNumber;
std::string _algorithm;
ExecutionState _state = ExecutionState::RUNNING;
std::vector <std::string, std::unique_ptr<Aggregator>> _aggregators;
std::vector<std::shared_ptr<LogicalCollection>> _vertexCollections, _edgeCollections;
std::map<ServerID, std::vector<ShardID>> _vertexServerMap;
std::string _algorithm;
unsigned int _globalSuperstep;
int32_t _dbServerCount = 0;

View File

@ -75,6 +75,33 @@ class IntegerGraphFormat : public GraphFormat<int64_t, int64_t> {
int64_t readVertexData(void* ptr) override { return *((int64_t*)ptr); }
int64_t readEdgeData(void* ptr) override { return *((int64_t*)ptr); }
};
class FloatGraphFormat : public GraphFormat<float, float> {
const std::string _field;
const float _vDefault, _eDefault;
public:
IntegerGraphFormat(std::string const& field, float vertexNull,
float edgeNull)
: _field(field), _vDefault(vertexNull), _eDefault(edgeNull) {}
size_t copyVertexData(VPackSlice document, void* targetPtr,
size_t maxSize) override {
VPackSlice val = document.get(_field);
*((float*)targetPtr) = val.isInteger() ? val.getInt() : _vDefault;
return sizeof(float);
}
size_t copyEdgeData(VPackSlice document, void* targetPtr,
float maxSize) override {
VPackSlice val = document.get(_field);
*((float*)targetPtr) = val.isInteger() ? val.getInt() : _eDefault;
return sizeof(float);
}
float readVertexData(void* ptr) override { return *((float*)ptr); }
float readEdgeData(void* ptr) override { return *((float*)ptr); }
};
}
}
#endif

View File

@ -34,8 +34,8 @@ struct MessageCombiner {
virtual M combine(M const& firstValue, M const& secondValue) const = 0;
};
struct MinIntegerCombiner : public MessageCombiner<int64_t> {
MinIntegerCombiner() {}
struct IntegerMinCombiner : public MessageCombiner<int64_t> {
IntegerMinCombiner() {}
int64_t combine(int64_t const& firstValue,
int64_t const& secondValue) const override {
return firstValue < secondValue ? firstValue : secondValue;

View File

@ -54,19 +54,26 @@ OutgoingCache<V, E, M>::~OutgoingCache() {
template <typename V, typename E, typename M>
void OutgoingCache<V, E, M>::clear() {
// TODO better way?
/*for (auto const& it : _map) {
for (auto const& it2 : it.second) {
it2.second->clear(); // clears VPackBuilder
}
}*/
_map.clear();
_containedMessages = 0;
}
static void resolveResponsibleShard(ClusterInfo* ci, LogicalCollection* info,
std::string const& vertexKey,
std::string& responsibleShard) {
static inline LogicalCollection* resolveCollection(ClusterInfo* ci,
std::string const& database,
std::string const& collectionName,
std::map<std::string, std::string> const& collectionPlanIdMap) {
auto const& it = collectionPlanIdMap.find(collectionName);
if (it == collectionPlanIdMap.end()) {
LOG(ERR) << "Collection this messages is going to is unkown";
THROW_ARANGO_EXCEPTION(TRI_ERROR_FORBIDDEN);
}
std::shared_ptr<LogicalCollection> collectionInfo(ci->getCollection(database, it->second));
return collectionInfo.get();
}
static inline void resolveShard(ClusterInfo* ci, LogicalCollection* info,
std::string const& vertexKey,
std::string& responsibleShard) {
bool usesDefaultShardingAttributes;
VPackBuilder partial;
partial.openObject();
@ -86,15 +93,16 @@ static void resolveResponsibleShard(ClusterInfo* ci, LogicalCollection* info,
template <typename V, typename E, typename M>
void OutgoingCache<V, E, M>::sendMessageTo(std::string const& toValue,
M const& data) {
assert(_combiner);
std::string _key = Utils::vertexKeyFromToValue(toValue);
std::string collectionName = Utils::collectionFromToValue(toValue);
std::size_t pos = toValue.find('/');
std::string _key = toValue.substr(pos + 1, toValue.length() - pos - 1);
std::string collectionName = toValue.substr(0, pos);
LOG(INFO) << "Adding outgoing messages for " << collectionName << "/" << _key;
std::shared_ptr<LogicalCollection> collectionInfo(_ci->getCollection(_state->database(),
collectionName));
LogicalCollection *coll = resolveCollection(_ci, _state->database(),
collectionName, _state->collectionPlanIdMap());
ShardID responsibleShard;
resolveResponsibleShard(_ci, _collInfo.get(), _key, responsibleShard);
resolveShard(_ci, coll, _key, responsibleShard);
LOG(INFO) << "Responsible shard: " << responsibleShard;
std::vector<ShardID> const& localShards = _state->localVertexShardIDs();

View File

@ -55,12 +55,10 @@ class OutgoingCache {
/// @brief two stage map: shard -> vertice -> message
std::unordered_map<ShardID, std::unordered_map<std::string, M>> _map;
std::shared_ptr<WorkerState<V, E, M>> _state;
std::shared_ptr<LogicalCollection> _collInfo;
ClusterInfo* _ci;
std::string _baseUrl;
/// @brief current number of vertices stored
std::map<std::string, std::string> _collectionPlanIdMap;
size_t _containedMessages = 0;
size_t _sendMessages = 0;
};

View File

@ -43,7 +43,7 @@ std::string const Utils::executionNumberKey = "exn";
std::string const Utils::totalVertexCount = "vertexCount";
std::string const Utils::totalEdgeCount = "edgeCount";
std::string const Utils::shardPlanMapKey = "shardPlanMap";
std::string const Utils::collectionPlanIdMapKey = "collectionPlanIdMap";
std::string const Utils::vertexShardsListKey = "vertexShards";
std::string const Utils::edgeShardsListKey = "edgeShards";
@ -56,6 +56,9 @@ std::string const Utils::messagesKey = "msgs";
std::string const Utils::senderKey = "sender";
std::string const Utils::doneKey = "done";
std::string const Utils::parameterMapKey = "params";
std::string const Utils::aggregatorsKey = "aggregators";
std::string const Utils::edgeShardingKey = "_vertex";
std::string Utils::baseUrl(std::string dbName) {

View File

@ -46,10 +46,11 @@ class Utils {
static std::string const executionNumberKey;
static std::string const algorithmKey;
static std::string const coordinatorIdKey;
static std::string const totalVertexCount;
static std::string const totalEdgeCount;
static std::string const shardPlanMapKey;
static std::string const collectionPlanIdMapKey;
static std::string const vertexShardsListKey;
static std::string const edgeShardsListKey;
@ -58,6 +59,9 @@ class Utils {
static std::string const senderKey;
static std::string const doneKey;
static std::string const parameterMapKey;
static std::string const aggregatorsKey;
static std::string const edgeShardingKey;
static std::string baseUrl(std::string dbName);
static std::string collectionFromToValue(std::string const& graphKey);

View File

@ -87,6 +87,7 @@ Worker<V, E, M>::~Worker() {
LOG(INFO) << "Called ~Worker()";
const size_t threadNum = 1;
_workerPool.reset(new ThreadPool(static_cast<size_t>(threadNum), "Pregel Worker"));
_ctx->algorithm()->aggregators(_aggregators);
}
/// @brief Setup next superstep

View File

@ -57,11 +57,11 @@ class Worker : public IWorker {
void finalizeExecution(VPackSlice data) override;
private:
// Mutex _messagesMutex; TODO figure this out
bool _running = true;
std::shared_ptr<WorkerState<V, E, M>> _ctx;
std::shared_ptr<GraphStore<V, E>> _graphStore;
std::unique_ptr<basics::ThreadPool> _workerPool;
bool _running = true;
std::vector <std::string, std::unique_ptr<Aggregator>> _aggregators;
void workerJobIsDone(bool allVerticesHalted);
};

View File

@ -38,9 +38,9 @@ WorkerState<V, E, M>::WorkerState(Algorithm<V, E, M>* algo, DatabaseID dbname,
VPackSlice vertexShardIDs = params.get(Utils::vertexShardsListKey);
VPackSlice edgeShardIDs = params.get(Utils::edgeShardsListKey);
VPackSlice execNum = params.get(Utils::executionNumberKey);
VPackSlice planIDs = params.get(Utils::shardPlanMapKey);
VPackSlice collectionPlanIdMap = params.get(Utils::collectionPlanIdMapKey);
if (!coordID.isString() || !vertexShardIDs.isArray() ||
!edgeShardIDs.isArray() || !execNum.isInteger()) {
!edgeShardIDs.isArray() || !execNum.isInteger() || !collectionPlanIdMap.isObject()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"Supplied bad parameters to worker");
}
@ -63,9 +63,9 @@ WorkerState<V, E, M>::WorkerState(Algorithm<V, E, M>* algo, DatabaseID dbname,
LOG(INFO) << name;
}
for (auto const& it : VPackObjectIterator(planIDs)) {
}
for (auto const& it : VPackObjectIterator(collectionPlanIdMap)) {
_collectionPlanIdMap[it.key.toString()] = it.value.toString();
}
auto format = algo->messageFormat();
auto combiner = algo->messageCombiner();

View File

@ -25,7 +25,7 @@
#include <velocypack/velocypack-aliases.h>
#include "Basics/Common.h"
#include "Cluster/CLusterInfo.h"
#include "Cluster/ClusterInfo.h"
#include "Algorithm.h"
@ -78,6 +78,8 @@ class WorkerState {
}
std::shared_ptr<Algorithm<V, E, M>> algorithm() { return _algorithm; }
std::map<CollectionID, std::string> const& collectionPlanIdMap() {return _collectionPlanIdMap;};
private:
/// @brief guard to make sure the database is not dropped while used by us
@ -89,6 +91,7 @@ class WorkerState {
std::string _coordinatorId;
const std::string _database;
std::vector<ShardID> _localVertexShardIDs, _localEdgeShardIDs;
std::map<std::string, std::string> _collectionPlanIdMap;
std::shared_ptr<IncomingCache<M>> _readCache, _writeCache;
void swapIncomingCaches(); // only call when message receiving is locked

View File

@ -111,5 +111,5 @@ RestStatus RestPregelHandler::execute() {
LOG(ERR) << "Exception";
}
return status::DONE;
return RestStatus::DONE;
}