1
0
Fork 0

PageRank compensation

This commit is contained in:
Simon Grätzer 2016-12-09 14:26:57 +01:00
parent d293d215c1
commit a47fb047f4
6 changed files with 110 additions and 39 deletions

View File

@ -99,6 +99,28 @@ public:
VPackValue vpackValue() override { return VPackValue(_value); };
};
template<typename T>
class SumAggregator : public Aggregator {
static_assert(std::is_arithmetic<T>::value, "Type must be numeric");
T _value;
public:
SumAggregator(T val) : Aggregator(true), _value(val) {}
void aggregate(void const* valuePtr) override {
_value += *((T*)valuePtr);
};
void aggregate(VPackSlice slice) override {
_value += slice.getNumber<T>();
}
void const* getValue() const override { return &_value; };
/*void setValue(VPackSlice slice) override {
_value = (float)slice.getDouble();
}*/
VPackValue vpackValue() override { return VPackValue(_value); };
};
}
}
#endif

View File

@ -112,9 +112,32 @@ PageRankAlgorithm::createComputation(uint64_t gss) const {
}
struct PRCompensation : public VertexCompensation<float, float, float> {
PRCompensation() {}
void compensate(bool inLostPartition) override {
const uint32_t *step = getAggregatedValue<uint32_t>("step");
if (step) {
if (*step == 0 && !inLostPartition) {
uint32_t c = 1;
aggregate("nonfailedCount", &c);
aggregate("totalrank", mutableVertexData<float>());
} else if (*step == 1) {
const float *scale;
float *data = mutableVertexData<float>();
if (inLostPartition) {
scale = getAggregatedValue<float>("scale1");
} else {
scale = getAggregatedValue<float>("scale2");
}
if (scale && *scale != 0) {
*data = *scale;
}
}
}
}
};
@ -125,7 +148,15 @@ PageRankAlgorithm::createCompensation(uint64_t gss) const {
Aggregator* PageRankAlgorithm::aggregator(std::string const& name) const {
if (name == "convergence") {
return new FloatMaxAggregator(0);
return new FloatMaxAggregator(-1);
} else if (name == "nonfailedCount") {
return new SumAggregator<uint32_t>(0);
} else if (name == "totalrank") {
return new SumAggregator<float>(0);
} else if (name == "step") {
return new ValueAggregator<uint32_t>(0);
} else if (name == "scale1" || name == "scale2") {
return new ValueAggregator<float>(-1);
}
return nullptr;
}
@ -137,6 +168,30 @@ struct PRMasterContext : public MasterContext {
LOG(INFO) << "Current convergence level" << *convergence;
return true;
}
int32_t recoveryStep = -1;
bool preCompensation(uint64_t gss) {
recoveryStep++;
aggregate("step", &recoveryStep);
return true;
}
bool postCompensation(uint64_t gss) {
if (recoveryStep == 0) {
const float* totalrank = getAggregatedValue<float>("totalrank");
const uint32_t* nonfailedCount = getAggregatedValue<uint32_t>("nonfailedCount");
if (totalrank && *totalrank != 0 && nonfailedCount && *nonfailedCount != 0) {
float scale1 = 1.0f / this->vertexCount();
float scale2 = ((*nonfailedCount) * 1.0f) / (this->vertexCount() * (*totalrank));
aggregate("scale1", &scale1);
aggregate("scale2", &scale2);
return true;
}
}
recoveryStep = -1;
return false;
}
};

View File

@ -36,7 +36,7 @@ class MasterContext {
friend class Conductor;
uint64_t _vertexCount, _edgeCount;
const AggregatorUsage* _aggregators;
AggregatorUsage* _aggregators;
protected:
template <typename T>

View File

@ -87,38 +87,31 @@ void RecoveryManager::monitorCollections(
void RecoveryManager::_monitorShard(CollectionID const& cid,
ShardID const& shard) {
std::function<bool(VPackSlice const& result)> listener =
[this](VPackSlice const& result) {
/*if (result.isObject() && result.length() == (size_t)numberOfShards) {
std::string tmpMsg = "";
bool tmpHaveError = false;
[this, shard](VPackSlice const& result) {
auto const& conductors = _listeners.find(shard);
if (conductors == _listeners.end()) {
return false;
}
for (auto const& p : VPackObjectIterator(result)) {
if (arangodb::basics::VelocyPackHelper::getBooleanValue(
p.value, "error", false)) {
tmpHaveError = true;
tmpMsg += " shardID:" + p.key.copyString() + ":";
tmpMsg += arangodb::basics::VelocyPackHelper::getStringValue(
p.value, "errorMessage", "");
if (p.value.hasKey("errorNum")) {
VPackSlice const errorNum = p.value.get("errorNum");
if (errorNum.isNumber()) {
tmpMsg += " (errNum=";
tmpMsg += basics::StringUtils::itoa(
errorNum.getNumericValue<uint32_t>());
tmpMsg += ")";
}
}
}
}
if (tmpHaveError) {
*errMsg = "Error in creation of collection:" + tmpMsg;
*dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_COLLECTION;
return true;
}
*dbServerResult = setErrormsg(TRI_ERROR_NO_ERROR, *errMsg);
}*/
// PregelFeature::instance()->notifyConductors();
if (result.isArray()) {
if (result.length() > 0) {
ServerID nextPrimary = result.at(0).copyString();
auto const& currentPrimary = _primaryServers.find(shard);
if (currentPrimary != _primaryServers.end()
&& currentPrimary->second != nextPrimary) {
_primaryServers[shard] = nextPrimary;
for (Conductor *cc : conductors->second) {
cc->startRecovery();
}
}
} else {
for (Conductor *cc : conductors->second) {
cc->cancel();
}
}
}
LOG(INFO) << result.toString();
return true;
@ -131,7 +124,7 @@ void RecoveryManager::_monitorShard(CollectionID const& cid,
std::shared_ptr<std::vector<ServerID>> servers =
ClusterInfo::instance()->getResponsibleServer(shard);
if (servers->size() > 0) {
_primaryServer[shard] = servers->at(0);
_primaryServers[shard] = servers->at(0);
auto call =
std::make_shared<AgencyCallback>(_agency, path, listener, true, false);

View File

@ -45,7 +45,7 @@ class RecoveryManager {
AgencyCallbackRegistry *_agencyCallbackRegistry;//weak
std::map<ShardID, std::set<Conductor*>> _listeners;
std::map<ShardID, ServerID> _primaryServer;
std::map<ShardID, ServerID> _primaryServers;
std::map<ShardID, std::shared_ptr<AgencyCallback>> _agencyCallbacks;
void _monitorShard(CollectionID const& cid, ShardID const& shard);

View File

@ -12,16 +12,17 @@ var graph_module = require("@arangodb/general-graph");
var fs = require('fs');
module.exports = function (gname, filename) {
var vColl = gname+"_nodes", eColl = gname+"_edges";
var vColl = gname+"_vertices", eColl = gname+"_edges";
var graph;
var exists = graph_module._list().indexOf(gname) != -1;
if (!exists) {
graph = graph_module._create(gname);
db._create(vColl, {numberOfShards: 4});
db._create(vColl, {numberOfShards: 2, replicationFactor:2});
graph._addVertexCollection(vColl);
db._createEdgeCollection(eColl, {
numberOfShards: 4,
numberOfShards: 2,
replicationFactor: 2,
shardKeys:["_vertex"],
distributeShardsLike:vColl});