//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "IncomingCache.h" #include "Utils.h" #include "Basics/MutexLocker.h" #include "Basics/StaticStrings.h" #include "Basics/VelocyPackHelper.h" #include #include //#include //#include using namespace arangodb; using namespace arangodb::pregel; template void InCache::parseMessages(VPackSlice incomingMessages) { prgl_shard_t shard = 0; std::string key; VPackValueLength i = 0; for (VPackSlice current : VPackArrayIterator(incomingMessages)) { if (i % 3 == 0) { shard = (prgl_shard_t)current.getUInt(); } else if (i % 3 == 1) { // TODO support multiple recipients key = current.copyString(); } else { MUTEX_LOCKER(guard, this->_writeLock); if (current.isArray()) { for (VPackSlice val : VPackArrayIterator(current)) { M newValue; _format->unwrapValue(val, newValue); _set(shard, key, newValue); } } else { M newValue; _format->unwrapValue(current, newValue); _set(shard, key, newValue); } } i++; } if (i % 3 != 0) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_BAD_PARAMETER, "There must always be a multiple of 3 entries in messages"); } } template void InCache::setDirect(prgl_shard_t shard, std::string const& vertexId, M const& data) { MUTEX_LOCKER(guard, this->_writeLock); this->_set(shard, vertexId, data); } // ================== ArrayIncomingCache ================== template void ArrayInCache::_set(prgl_shard_t shard, std::string const& key, M const& newValue) { this->_receivedMessageCount++; HMap& vertexMap = _shardMap[shard]; vertexMap[key].push_back(newValue); } template void ArrayInCache::mergeCache(InCache const* otherCache) { MUTEX_LOCKER(guard, this->_writeLock); ArrayInCache* other = (ArrayInCache*)otherCache; this->_receivedMessageCount += other->_receivedMessageCount; // cannot call setDirect since it locks for (auto const& pair : other->_shardMap) { HMap& vertexMap = _shardMap[pair.first]; for (auto& vertexMessage : pair.second) { std::vector& a = vertexMap[vertexMessage.first]; std::vector const& b = vertexMessage.second; a.insert(a.end(), b.begin(), b.end()); } } } template MessageIterator ArrayInCache::getMessages(prgl_shard_t shard, std::string const& key) { HMap const& vertexMap = _shardMap[shard]; auto vmsg = vertexMap.find(key); if (vmsg != vertexMap.end()) { LOG(INFO) << "Got a message for " << key; M const* ptr = vmsg->second.data(); return MessageIterator(ptr, vmsg->second.size()); } else { LOG(INFO) << "No message for " << key; return MessageIterator(); } } template void ArrayInCache::clear() { MUTEX_LOCKER(guard, this->_writeLock); this->_receivedMessageCount = 0; _shardMap.clear(); } template void ArrayInCache::erase(prgl_shard_t shard, std::string const& key) { MUTEX_LOCKER(guard, this->_writeLock); HMap& vertexMap(_shardMap[shard]); vertexMap.erase(key); } template void ArrayInCache::forEach( std::function func) { for (auto const& pair : _shardMap) { prgl_shard_t shard = pair.first; HMap const& vertexMap = pair.second; for (auto& vertexMsgs : vertexMap) { for (M const& val : vertexMsgs.second) { func(shard, vertexMsgs.first, val); } } } } // ================== CombiningIncomingCache ================== template void CombiningInCache::_set(prgl_shard_t shard, std::string const& key, M const& newValue) { /*cuckoohash_map> Table; for (int i = 0; i < 100; i++) { Table[i] = "hello"+std::to_string(i); } for (int i = 0; i < 101; i++) { std::string out; if (Table.find(i, out)) { LOG(INFO) << i << " " << out; } else { LOG(INFO) << i << " NOT FOUND"; } }*/ this->_receivedMessageCount++; HMap& vertexMap = _shardMap[shard]; auto vmsg = vertexMap.find(key); if (vmsg != vertexMap.end()) { // got a message for the same vertex _combiner->combine(vmsg->second, newValue); } else { vertexMap.insert(std::make_pair(key, newValue)); } } template void CombiningInCache::mergeCache(InCache const* otherCache) { MUTEX_LOCKER(guard, this->_writeLock); CombiningInCache* other = (CombiningInCache*)otherCache; this->_receivedMessageCount += other->_receivedMessageCount; // cannot call setDirect since it locks for (auto const& pair : other->_shardMap) { HMap& vertexMap = _shardMap[pair.first]; for (auto& vertexMessage : pair.second) { auto vmsg = vertexMap.find(vertexMessage.first); if (vmsg != vertexMap.end()) { // got a message for the same vertex _combiner->combine(vmsg->second, vertexMessage.second); } else { vertexMap.insert(vertexMessage); } } } } template MessageIterator CombiningInCache::getMessages(prgl_shard_t shard, std::string const& key) { HMap const& vertexMap(_shardMap[shard]); auto vmsg = vertexMap.find(key); if (vmsg != vertexMap.end()) { return MessageIterator(&vmsg->second); } else { return MessageIterator(); } } template void CombiningInCache::clear() { MUTEX_LOCKER(guard, this->_writeLock); this->_receivedMessageCount = 0; _shardMap.clear(); } template void CombiningInCache::erase(prgl_shard_t shard, std::string const& key) { MUTEX_LOCKER(guard, this->_writeLock); HMap& vertexMap(_shardMap[shard]); vertexMap.erase(key); } template void CombiningInCache::forEach( std::function func) { for (auto const& pair : _shardMap) { prgl_shard_t shard = pair.first; HMap const& vertexMap = pair.second; for (auto& vertexMessage : vertexMap) { func(shard, vertexMessage.first, vertexMessage.second); } } } // template types to create template class arangodb::pregel::InCache; template class arangodb::pregel::InCache; template class arangodb::pregel::InCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::ArrayInCache; template class arangodb::pregel::CombiningInCache; template class arangodb::pregel::CombiningInCache; template class arangodb::pregel::CombiningInCache;