//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "IncomingCache.h" #include "Pregel/CommonFormats.h" #include "Pregel/Utils.h" #include "Pregel/WorkerConfig.h" #include "Basics/MutexLocker.h" #include "Basics/StaticStrings.h" #include "Basics/VelocyPackHelper.h" #include #include #include #include using namespace arangodb; using namespace arangodb::pregel; template InCache::InCache(MessageFormat const* format) : _containedMessageCount(0), _format(format) {} template void InCache::parseMessages(VPackSlice const& incomingData) { // every packet contains one shard VPackSlice shardSlice = incomingData.get(Utils::shardIdKey); VPackSlice messages = incomingData.get(Utils::messagesKey); // temporary variables VPackValueLength i = 0; VPackStringRef key; PregelShard shard = (PregelShard)shardSlice.getUInt(); std::lock_guard guard(this->_bucketLocker[shard]); for (VPackSlice current : VPackArrayIterator(messages)) { if (i % 2 == 0) { // TODO support multiple recipients key = current; } else { TRI_ASSERT(!key.empty()); if (current.isArray()) { VPackValueLength c = 0; for (VPackSlice val : VPackArrayIterator(current)) { M newValue; _format->unwrapValue(val, newValue); _set(shard, key, newValue); c++; } this->_containedMessageCount += c; } else { M newValue; _format->unwrapValue(current, newValue); _set(shard, key, newValue); this->_containedMessageCount++; } } i++; } if (i % 2 != 0) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_BAD_PARAMETER, "There must always be a multiple of 2 entries in message array"); } } template void InCache::storeMessageNoLock(PregelShard shard, VPackStringRef const& vertexId, M const& data) { this->_set(shard, vertexId, data); this->_containedMessageCount++; } template void InCache::storeMessage(PregelShard shard, VPackStringRef const& vertexId, M const& data) { std::lock_guard guard(this->_bucketLocker[shard]); this->_set(shard, vertexId, data); this->_containedMessageCount++; } // ================== ArrayIncomingCache ================== template ArrayInCache::ArrayInCache(WorkerConfig const* config, MessageFormat const* format) : InCache(format) { if (config != nullptr) { std::set const& shardIDs = config->localPregelShardIDs(); // one mutex per shard, we will see how this scales for (PregelShard shardID : shardIDs) { this->_bucketLocker[shardID]; _shardMap[shardID]; } } } template void ArrayInCache::_set(PregelShard shard, VPackStringRef const& key, M const& newValue) { HMap& vertexMap(_shardMap[shard]); vertexMap[key.toString()].push_back(newValue); } template void ArrayInCache::mergeCache(WorkerConfig const& config, InCache const* otherCache) { ArrayInCache* other = (ArrayInCache*)otherCache; this->_containedMessageCount += other->_containedMessageCount; // ranomize access to buckets, don't wait for the lock std::set const& shardIDs = config.localPregelShardIDs(); std::vector randomized(shardIDs.begin(), shardIDs.end()); std::random_device rd; std::mt19937 g(rd()); std::shuffle(randomized.begin(), randomized.end(), g); size_t i = 0; do { i = (i + 1) % randomized.size(); PregelShard shardId = randomized[i]; auto const& it = other->_shardMap.find(shardId); if (it != other->_shardMap.end() && it->second.size() > 0) { std::unique_lock guard(this->_bucketLocker[shardId], std::try_to_lock); if (!guard) { if (i == 0) { // eventually we hit the last one std::this_thread::sleep_for(std::chrono::microseconds(100)); // don't busy wait } continue; } // only access bucket after we acquired the lock HMap& myVertexMap = _shardMap[shardId]; for (auto& vertexMessage : it->second) { std::vector& a = myVertexMap[vertexMessage.first]; std::vector const& b = vertexMessage.second; a.insert(a.end(), b.begin(), b.end()); } } randomized.erase(randomized.begin() + i); } while (randomized.size() > 0); } template MessageIterator ArrayInCache::getMessages(PregelShard shard, VPackStringRef const& key) { std::string keyS = key.toString(); HMap const& vertexMap = _shardMap[shard]; auto vmsg = vertexMap.find(keyS); if (vmsg != vertexMap.end()) { M const* ptr = vmsg->second.data(); return MessageIterator(ptr, vmsg->second.size()); } else { return MessageIterator(); } } template void ArrayInCache::clear() { for (auto& pair : _shardMap) { // keep the keys // MUTEX_LOCKER(guard, this->_bucketLocker[pair.first]); pair.second.clear(); } this->_containedMessageCount = 0; } /// Deletes one entry. DOES NOT LOCK template void ArrayInCache::erase(PregelShard shard, VPackStringRef const& key) { std::string keyS = key.toString(); HMap& vertexMap = _shardMap[shard]; auto const& it = vertexMap.find(keyS); if (it != vertexMap.end()) { vertexMap.erase(it); this->_containedMessageCount--; } } template void ArrayInCache::forEach(std::function func) { for (auto const& pair : _shardMap) { PregelShard shard = pair.first; HMap const& vertexMap = pair.second; for (auto& vertexMsgs : vertexMap) { for (M const& val : vertexMsgs.second) { func(shard, VPackStringRef(vertexMsgs.first), val); } } } } // ================== CombiningIncomingCache ================== template CombiningInCache::CombiningInCache(WorkerConfig const* config, MessageFormat const* format, MessageCombiner const* combiner) : InCache(format), _combiner(combiner) { if (config != nullptr) { std::set const& shardIDs = config->localPregelShardIDs(); // one mutex per shard, we will see how this scales for (PregelShard shardID : shardIDs) { this->_bucketLocker[shardID]; _shardMap[shardID]; } } } template void CombiningInCache::_set(PregelShard shard, VPackStringRef const& key, M const& newValue) { std::string keyS = key.toString(); HMap& vertexMap = _shardMap[shard]; auto vmsg = vertexMap.find(keyS); if (vmsg != vertexMap.end()) { // got a message for the same vertex _combiner->combine(vmsg->second, newValue); } else { vertexMap.insert(std::make_pair(std::move(keyS), newValue)); } } template void CombiningInCache::mergeCache(WorkerConfig const& config, InCache const* otherCache) { CombiningInCache* other = (CombiningInCache*)otherCache; this->_containedMessageCount += other->_containedMessageCount; // ranomize access to buckets, don't wait for the lock std::set const& shardIDs = config.localPregelShardIDs(); std::vector randomized(shardIDs.begin(), shardIDs.end()); std::random_device rd; std::mt19937 g(rd()); std::shuffle(randomized.begin(), randomized.end(), g); size_t i = 0; do { i = (i + 1) % randomized.size(); PregelShard shardId = randomized[i]; auto const& it = other->_shardMap.find(shardId); if (it != other->_shardMap.end() && it->second.size() > 0) { std::unique_lock guard(this->_bucketLocker[shardId], std::try_to_lock); if (!guard) { if (i == 0) { // eventually we hit the last one std::this_thread::sleep_for(std::chrono::microseconds(100)); // don't busy wait } continue; } // only access bucket after we acquired the lock HMap& myVertexMap = _shardMap[shardId]; for (auto& vertexMessage : it->second) { auto vmsg = myVertexMap.find(vertexMessage.first); if (vmsg != myVertexMap.end()) { // got a message for the same vertex _combiner->combine(vmsg->second, vertexMessage.second); } else { myVertexMap.insert(vertexMessage); } } } randomized.erase(randomized.begin() + i); } while (randomized.size() > 0); } template MessageIterator CombiningInCache::getMessages(PregelShard shard, VPackStringRef const& key) { std::string keyS = key.toString(); HMap const& vertexMap = _shardMap[shard]; auto vmsg = vertexMap.find(keyS); if (vmsg != vertexMap.end()) { return MessageIterator(&vmsg->second); } else { return MessageIterator(); } } template void CombiningInCache::clear() { for (auto& pair : _shardMap) { pair.second.clear(); } this->_containedMessageCount = 0; } /// Deletes one entry. DOES NOT LOCK template void CombiningInCache::erase(PregelShard shard, VPackStringRef const& key) { std::string keyS = key.toString(); HMap& vertexMap = _shardMap[shard]; auto const& it = vertexMap.find(keyS); if (it != vertexMap.end()) { vertexMap.erase(it); this->_containedMessageCount--; } } /// Calls function for each entry. DOES NOT LOCK template void CombiningInCache::forEach( std::function func) { for (auto const& pair : _shardMap) { PregelShard shard = pair.first; HMap const& vertexMap = pair.second; for (auto& vertexMessage : vertexMap) { func(shard, VPackStringRef(vertexMessage.first), vertexMessage.second); } } } // template types to create template class arangodb::pregel::InCache; template class arangodb::pregel::InCache; template class arangodb::pregel::InCache; template class arangodb::pregel::InCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::CombiningInCache; template class arangodb::pregel::CombiningInCache; template class arangodb::pregel::CombiningInCache; template class arangodb::pregel::CombiningInCache; // algo specific template class arangodb::pregel::InCache>; template class arangodb::pregel::ArrayInCache>; template class arangodb::pregel::CombiningInCache>; template class arangodb::pregel::InCache>; template class arangodb::pregel::ArrayInCache>; template class arangodb::pregel::CombiningInCache>; template class arangodb::pregel::InCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::CombiningInCache; template class arangodb::pregel::InCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::CombiningInCache;