1
0
Fork 0
This commit is contained in:
Simon Grätzer 2017-05-12 14:07:30 +02:00
parent 8a3d7267d0
commit 958c19de35
4 changed files with 68 additions and 42 deletions

View File

@ -20,7 +20,7 @@
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "LabelPropagation.h"
#include "SLPA.h"
#include <cmath>
#include "Cluster/ClusterInfo.h"
#include "Cluster/ServerState.h"
@ -30,6 +30,7 @@
#include "Pregel/IncomingCache.h"
#include "Pregel/MasterContext.h"
#include "Pregel/VertexComputation.h"
#include "Random/RandomGenerator.h"
using namespace arangodb;
using namespace arangodb::pregel;
@ -43,7 +44,7 @@ struct SLPAComputation : public VertexComputation<SLPAValue, int8_t, uint64_t> {
uint64_t mostFrequent(MessageIterator<uint64_t> const& messages) {
TRI_ASSERT(messages.size() > 0);
if (messages.size() == 1) {
return std::min(**messages, mutableVertexData()->currentCommunity);
return **messages;
}
// most frequent value
@ -70,47 +71,60 @@ struct SLPAComputation : public VertexComputation<SLPAValue, int8_t, uint64_t> {
}
}
if (maxCounter == 1) {
return std::min(all[0], mutableVertexData()->currentCommunity);
return all[0];
}
return maxValue;
}
void compute(MessageIterator<uint64_t> const& messages) override {
SLPAValue* value = mutableVertexData();
SLPAValue* val = mutableVertexData();
if (globalSuperstep() == 0) {
sendMessageToAllEdges(value->currentCommunity);
} else {
val->memory.emplace(val->nodeId, 1);
} else if (messages.size() > 0) {
// listen to our neighbours
uint64_t newCommunity = mostFrequent(messages);
// increment the stabilization count if vertex wants to stay in the
// same partition
if (value->lastCommunity == newCommunity) {
value->stabilizationRounds++;
}
bool isUnstable = value->stabilizationRounds <= STABILISATION_ROUNDS;
bool mayChange = value->currentCommunity != newCommunity;
if (mayChange && isUnstable) {
value->lastCommunity = value->currentCommunity;
value->currentCommunity = newCommunity;
value->stabilizationRounds = 0; // reset stabilization counter
sendMessageToAllEdges(value->currentCommunity);
auto it = val->memory.find(newCommunity);
if (it == val->memory.end()) {
val->memory.emplace(newCommunity, 1);
} else {
it->second++;
}
}
voteHalt();
// Normally the SLPA algo only lets one vertex by one speak sequentially,
// which is not really well parallizable. Additionally I figure
// since a speaker only speaks to neighbours and the speaker order is random
// we can get away with letting nodes speak in turn
bool speak = val->nodeId % 2 == globalSuperstep() % 2;
if (speak) {
// speak to our neighbours
float random = RandomGenerator::interval(UINT32_MAX);
float randomDoubleValue = random / (float)UINT32_MAX;
float cumulativeSum = 0;
// Randomly select a label with probability proportional to the
// occurrence frequency of this label in its memory
uint64_t numCommunities = globalSuperstep();//val->memory.size();
for (std::pair<uint64_t, uint64_t> const& e : val->memory) {
cumulativeSum = cumulativeSum + ((float)e.second)/ numCommunities;
if(cumulativeSum >= randomDoubleValue) {
sendMessageToAllEdges(e.first);
}
}
sendMessageToAllEdges(val->nodeId);
}
}
};
VertexComputation<LPValue, int8_t, uint64_t>*
LabelPropagation::createComputation(WorkerConfig const* config) const {
return new LPComputation();
VertexComputation<SLPAValue, int8_t, uint64_t>*
SLPA::createComputation(WorkerConfig const* config) const {
return new SLPAComputation();
}
struct LPGraphFormat : public GraphFormat<LPValue, int8_t> {
struct SLPAGraphFormat : public GraphFormat<SLPAValue, int8_t> {
std::string _resultField;
uint64_t vertexIdRange = 0;
explicit LPGraphFormat(std::string const& result) : _resultField(result) {}
explicit SLPAGraphFormat(std::string const& result) : _resultField(result) {}
size_t estimatedVertexSize() const override { return sizeof(LPValue); };
size_t estimatedEdgeSize() const override { return 0; };
@ -126,10 +140,10 @@ struct LPGraphFormat : public GraphFormat<LPValue, int8_t> {
}
size_t copyVertexData(std::string const& documentId,
arangodb::velocypack::Slice document, LPValue* value,
arangodb::velocypack::Slice document, SLPAValue* value,
size_t maxSize) override {
value->currentCommunity = vertexIdRange++;
return sizeof(LPValue);
value->nodeId = vertexIdRange++;
return sizeof(SLPAValue);
}
size_t copyEdgeData(arangodb::velocypack::Slice document, int8_t* targetPtr,
@ -137,9 +151,9 @@ struct LPGraphFormat : public GraphFormat<LPValue, int8_t> {
return 0;
}
bool buildVertexDocument(arangodb::velocypack::Builder& b, const LPValue* ptr,
bool buildVertexDocument(arangodb::velocypack::Builder& b, const SLPAValue* ptr,
size_t size) const override {
b.add(_resultField, VPackValue(ptr->currentCommunity));
//b.add(_resultField, VPackValue(ptr->currentCommunity));
return true;
}
@ -149,6 +163,6 @@ struct LPGraphFormat : public GraphFormat<LPValue, int8_t> {
}
};
GraphFormat<LPValue, int8_t>* LabelPropagation::inputFormat() const {
return new LPGraphFormat(_resultField);
GraphFormat<SLPAValue, int8_t>* SLPA::inputFormat() const {
return new SLPAGraphFormat(_resultField);
}

View File

@ -36,6 +36,8 @@ namespace pregel {
// Speaker-listerner Label propagation
struct SLPAValue {
// our own initialized id
uint64_t nodeId;
/// Memory used to hold the labelId and the count
// used for memorizing communities
std::map<uint64_t, uint64_t> memory;

View File

@ -158,6 +158,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
break;
}
case RocksDBLogType::BeginTransaction: {
TRI_ASSERT(!_singleOp);
_seenBeginTransaction = true;
_currentDbId = RocksDBLogValue::databaseId(blob);
_currentTrxId = RocksDBLogValue::transactionId(blob);
@ -185,7 +186,8 @@ class WALParser : public rocksdb::WriteBatch::Handler {
// intentional fall through
}
case RocksDBLogType::SinglePut: {
_singleOpTransaction = true;
TRI_ASSERT(!_seenBeginTransaction);
_singleOp = true;
_currentDbId = RocksDBLogValue::databaseId(blob);
_currentCollectionId = RocksDBLogValue::collectionId(blob);
_currentTrxId = 0;
@ -234,7 +236,9 @@ class WALParser : public rocksdb::WriteBatch::Handler {
break;
}
case RocksDBEntryType::Document: {
TRI_ASSERT(_seenBeginTransaction || _singleOpTransaction);
TRI_ASSERT(_seenBeginTransaction && !_singleOp ||
!_seenBeginTransaction && _singleOp);
// if real transaction, we need the trx id
TRI_ASSERT(!_seenBeginTransaction || _currentTrxId != 0);
TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0);
@ -244,7 +248,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
// auto containers = getContainerIds(key);
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
if (_singleOpTransaction) { // single op is defined to 0
if (_singleOp) { // single op is defined to 0
_builder.add("tid", VPackValue("0"));
} else {
_builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
@ -292,7 +296,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
// document removes, because of a drop is not transactional and
// should not appear in the WAL
if (!shouldHandleKey(key) ||
!(_seenBeginTransaction || _singleOpTransaction)) {
!(_seenBeginTransaction || _singleOp)) {
return;
}
@ -308,7 +312,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_REMOVE)));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
if (_singleOpTransaction) { // single op is defined to 0
if (_singleOp) { // single op is defined to 0
_builder.add("tid", VPackValue("0"));
} else {
_builder.add("tid", VPackValue(std::to_string(_currentTrxId)));
@ -331,7 +335,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_currentSequence = currentSequence;
_lastLogType = RocksDBLogType::Invalid;
_seenBeginTransaction = false;
_singleOpTransaction = false;
_singleOp = false;
_currentDbId = 0;
_currentTrxId = 0;
_currentCollectionId = 0;
@ -352,7 +356,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_builder.close();
}
_seenBeginTransaction = false;
_singleOpTransaction = false;
_singleOp = false;
return _currentSequence;
}
@ -421,7 +425,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
rocksdb::SequenceNumber _currentSequence;
RocksDBLogType _lastLogType = RocksDBLogType::Invalid;
bool _seenBeginTransaction = false;
bool _singleOpTransaction = false;
bool _singleOp = false;
bool _startOfBatch = false;
TRI_voc_tick_t _currentDbId = 0;
TRI_voc_tick_t _currentTrxId = 0;

View File

@ -49,6 +49,7 @@
#include <rocksdb/status.h>
#include <rocksdb/utilities/optimistic_transaction_db.h>
#include <rocksdb/utilities/transaction.h>
#include <rocksdb/utilities/write_batch_with_index.h>
using namespace arangodb;
@ -315,7 +316,12 @@ void RocksDBTransactionState::prepareOperation(
bool singleOp = hasHint(transaction::Hints::Hint::SINGLE_OPERATION);
// single operations should never call this method twice
// singleOp => lastUsedColl == 0
TRI_ASSERT(!singleOp || _lastUsedCollection == 0);
// singleOp => no keys
TRI_ASSERT(!singleOp ||
_rocksTransaction->GetNumPuts() == 0 &&
_rocksTransaction->GetNumDeletes() == 0);
if (collectionId != _lastUsedCollection) {
switch (operationType) {
case TRI_VOC_DOCUMENT_OPERATION_INSERT: