1
0
Fork 0

Testing randomized bucket access

This commit is contained in:
Simon Grätzer 2017-02-11 21:40:35 +01:00
parent 5f2f6727d3
commit becca2bdbb
3 changed files with 43 additions and 26 deletions

View File

@ -121,17 +121,25 @@ void ArrayInCache<M>::_set(prgl_shard_t shard, std::string const& key,
}
template <typename M>
void ArrayInCache<M>::mergeCache(InCache<M> const* otherCache) {
void ArrayInCache<M>::mergeCache(WorkerConfig const& config, InCache<M> const* otherCache) {
ArrayInCache<M>* other = (ArrayInCache<M>*)otherCache;
this->_containedMessageCount += other->_containedMessageCount;
for (auto const& pair : other->_shardMap) {
MUTEX_LOCKER(guard, this->_bucketLocker[pair.first]);
HMap& vertexMap(_shardMap[pair.first]);
for (auto& vertexMessage : pair.second) {
std::vector<M>& a = vertexMap[vertexMessage.first];
std::vector<M> const& b = vertexMessage.second;
a.insert(a.end(), b.begin(), b.end());
// ranomize access to buckets
std::set<prgl_shard_t> const& shardIDs = config.localPregelShardIDs();
std::vector<prgl_shard_t> randomized(shardIDs.begin(), shardIDs.end());
std::random_shuffle(randomized.begin(), randomized.end());
for (prgl_shard_t shardId : randomized) {
auto const& it = other->_shardMap.find(shardId);
if (it != other->_shardMap.end()) {
MUTEX_LOCKER(guard, this->_bucketLocker[shardId]);
HMap& myVertexMap = _shardMap[shardId];
for (auto& vertexMessage : it->second) {
std::vector<M>& a = myVertexMap[vertexMessage.first];
std::vector<M> const& b = vertexMessage.second;
a.insert(a.end(), b.begin(), b.end());
}
}
}
}
@ -215,20 +223,28 @@ void CombiningInCache<M>::_set(prgl_shard_t shard, std::string const& key,
}
template <typename M>
void CombiningInCache<M>::mergeCache(InCache<M> const* otherCache) {
void CombiningInCache<M>::mergeCache(WorkerConfig const& config,
InCache<M> const* otherCache) {
CombiningInCache<M>* other = (CombiningInCache<M>*)otherCache;
this->_containedMessageCount += other->_containedMessageCount;
for (auto const& pair : other->_shardMap) {
MUTEX_LOCKER(guard, this->_bucketLocker[pair.first]);
HMap& vertexMap = _shardMap[pair.first];
for (auto& vertexMessage : pair.second) {
auto vmsg = vertexMap.find(vertexMessage.first);
if (vmsg != vertexMap.end()) { // got a message for the same vertex
_combiner->combine(vmsg->second, vertexMessage.second);
} else {
vertexMap.insert(vertexMessage);
// ranomize access to buckets
std::set<prgl_shard_t> const& shardIDs = config.localPregelShardIDs();
std::vector<prgl_shard_t> randomized(shardIDs.begin(), shardIDs.end());
std::random_shuffle(randomized.begin(), randomized.end());
for (prgl_shard_t shardId : randomized) {
auto const& it = other->_shardMap.find(shardId);
if (it != other->_shardMap.end()) {
MUTEX_LOCKER(guard, this->_bucketLocker[shardId]);
HMap& myVertexMap = _shardMap[shardId];
for (auto& vertexMessage : it->second) {
auto vmsg = myVertexMap.find(vertexMessage.first);
if (vmsg != myVertexMap.end()) { // got a message for the same vertex
_combiner->combine(vmsg->second, vertexMessage.second);
} else {
myVertexMap.insert(vertexMessage);
}
}
}
}

View File

@ -73,7 +73,7 @@ class InCache {
void storeMessage(prgl_shard_t shard, std::string const& vertexId,
M const& data);
virtual void mergeCache(InCache<M> const* otherCache) = 0;
virtual void mergeCache(WorkerConfig const& config, InCache<M> const* otherCache) = 0;
/// @brief get messages for vertex id. (Don't use keys from _from or _to
/// directly, they contain the collection name)
virtual MessageIterator<M> getMessages(prgl_shard_t shard,
@ -103,7 +103,7 @@ class ArrayInCache : public InCache<M> {
public:
ArrayInCache(WorkerConfig const* config, MessageFormat<M> const* format);
void mergeCache(InCache<M> const* otherCache) override;
void mergeCache(WorkerConfig const& config, InCache<M> const* otherCache) override;
MessageIterator<M> getMessages(prgl_shard_t shard,
std::string const& key) override;
void clear() override;
@ -131,7 +131,7 @@ class CombiningInCache : public InCache<M> {
MessageCombiner<M> const* combiner() const { return _combiner; }
void mergeCache(InCache<M> const* otherCache) override;
void mergeCache(WorkerConfig const& config, InCache<M> const* otherCache) override;
MessageIterator<M> getMessages(prgl_shard_t shard,
std::string const& key) override;
void clear() override;

View File

@ -390,7 +390,7 @@ bool Worker<V, E, M>::_processVertices(
}
// merge thread local messages, _writeCache does locking
_writeCache->mergeCache(inCache.get());
_writeCache->mergeCache(_config, inCache.get());
// TODO ask how to implement message sending without waiting for a response
MessageStats stats;
@ -452,8 +452,9 @@ void Worker<V, E, M>::_finishedProcessing() {
size_t total = _graphStore->localVertexCount();
if (total > currentAVCount) {
if (_config.asynchronousMode()) {
// just process these vertices in the next superstep
ReadLocker<ReadWriteLock> rguard(&_cacheRWLock);
_writeCache->mergeCache(_readCache); // compute in next superstep
_writeCache->mergeCache(_config, _readCache); // compute in next superstep
_messageStats.sendCount += _readCache->containedMessageCount();
} else {
// TODO call _startProcessing ???