//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Simon Grätzer //////////////////////////////////////////////////////////////////////////////// #include "Pregel/OutgoingCache.h" #include "Pregel/CommonFormats.h" #include "Pregel/IncomingCache.h" #include "Pregel/Utils.h" #include "Pregel/WorkerConfig.h" #include "Basics/MutexLocker.h" #include "Basics/StaticStrings.h" #include "Cluster/ServerState.h" #include "Futures/Utilities.h" #include "Network/NetworkFeature.h" #include "Network/Methods.h" #include "VocBase/LogicalCollection.h" #include #include using namespace arangodb; using namespace arangodb::pregel; template OutCache::OutCache(WorkerConfig* state, MessageFormat const* format) : _config(state), _format(format) { _baseUrl = Utils::baseUrl(_config->database(), Utils::workerPrefix); } // ================= ArrayOutCache ================== template ArrayOutCache::~ArrayOutCache() = default; template void ArrayOutCache::_removeContainedMessages() { for (auto& pair : _shardMap) { pair.second.clear(); } this->_containedMessages = 0; } template void ArrayOutCache::appendMessage(PregelShard shard, VPackStringRef const& key, M const& data) { if (this->_config->isLocalVertexShard(shard)) { if (this->_sendToNextGSS) { // I use the global cache, we need locking this->_localCacheNextGSS->storeMessage(shard, key, data); this->_sendCountNextGSS++; } else { // the local cache is always thread local this->_localCache->storeMessageNoLock(shard, key, data); this->_sendCount++; } } else { _shardMap[shard][key].push_back(data); if (++(this->_containedMessages) >= this->_batchSize) { flushMessages(); } } } template void ArrayOutCache::flushMessages() { if (this->_containedMessages == 0) { return; } // LOG_TOPIC("7af7f", INFO, Logger::PREGEL) << "Beginning to send messages to other // machines"; uint64_t gss = this->_config->globalSuperstep(); if (this->_sendToNextGSS) { gss += 1; } VPackOptions options = VPackOptions::Defaults; options.buildUnindexedArrays = true; options.buildUnindexedObjects = true; application_features::ApplicationServer& server = this->_config->vocbase()->server(); auto const& nf = server.getFeature(); network::ConnectionPool* pool = nf.pool(); network::RequestOptions reqOpts; reqOpts.skipScheduler = true; std::vector> responses; for (auto const& it : _shardMap) { PregelShard shard = it.first; std::unordered_map> const& vertexMessageMap = it.second; if (vertexMessageMap.size() == 0) { continue; } VPackBuffer buffer; VPackBuilder data(buffer, &options); data.openObject(); data.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); data.add(Utils::executionNumberKey, VPackValue(this->_config->executionNumber())); data.add(Utils::globalSuperstepKey, VPackValue(gss)); data.add(Utils::shardIdKey, VPackValue(shard)); data.add(Utils::messagesKey, VPackValue(VPackValueType::Array, true)); for (auto const& vertexMessagePair : vertexMessageMap) { data.add(VPackValuePair(vertexMessagePair.first.data(), vertexMessagePair.first.size(), VPackValueType::String)); // key data.add(VPackValue(VPackValueType::Array, true)); // message array for (M const& val : vertexMessagePair.second) { this->_format->addValue(data, val); if (this->_sendToNextGSS) { this->_sendCountNextGSS++; } else { this->_sendCount++; } } data.close(); } data.close(); data.close(); // add a request ShardID const& shardId = this->_config->globalShardIDs()[shard]; responses.emplace_back(network::sendRequest(pool, "shard:" + shardId, fuerte::RestVerb::Post, this->_baseUrl + Utils::messagesPath, std::move(buffer), reqOpts)); } futures::collectAll(responses).wait(); this->_removeContainedMessages(); } // ================= CombiningOutCache ================== template CombiningOutCache::CombiningOutCache(WorkerConfig* state, MessageFormat const* format, MessageCombiner const* combiner) : OutCache(state, format), _combiner(combiner) {} template CombiningOutCache::~CombiningOutCache() = default; template void CombiningOutCache::_removeContainedMessages() { for (auto& pair : _shardMap) { pair.second.clear(); } this->_containedMessages = 0; } template void CombiningOutCache::appendMessage(PregelShard shard, VPackStringRef const& key, M const& data) { if (this->_config->isLocalVertexShard(shard)) { if (this->_sendToNextGSS) { this->_localCacheNextGSS->storeMessage(shard, key, data); this->_sendCountNextGSS++; } else { this->_localCache->storeMessageNoLock(shard, key, data); this->_sendCount++; } } else { std::unordered_map& vertexMap = _shardMap[shard]; auto it = vertexMap.find(key); if (it != vertexMap.end()) { // more than one message _combiner->combine(vertexMap[key], data); } else { // first message for this vertex vertexMap.try_emplace(key, data); if (++(this->_containedMessages) >= this->_batchSize) { // LOG_TOPIC("23bc7", INFO, Logger::PREGEL) << "Hit buffer limit"; flushMessages(); } } } } template void CombiningOutCache::flushMessages() { if (this->_containedMessages == 0) { return; } uint64_t gss = this->_config->globalSuperstep(); if (this->_sendToNextGSS && this->_config->asynchronousMode()) { gss += 1; } VPackOptions options = VPackOptions::Defaults; options.buildUnindexedArrays = true; options.buildUnindexedObjects = true; application_features::ApplicationServer& server = this->_config->vocbase()->server(); auto const& nf = server.getFeature(); network::ConnectionPool* pool = nf.pool(); std::vector> responses; for (auto const& it : _shardMap) { PregelShard shard = it.first; std::unordered_map const& vertexMessageMap = it.second; if (vertexMessageMap.size() == 0) { continue; } VPackBuffer buffer; VPackBuilder data(buffer, &options); data.openObject(); data.add(Utils::senderKey, VPackValue(ServerState::instance()->getId())); data.add(Utils::executionNumberKey, VPackValue(this->_config->executionNumber())); data.add(Utils::globalSuperstepKey, VPackValue(gss)); data.add(Utils::shardIdKey, VPackValue(shard)); data.add(Utils::messagesKey, VPackValue(VPackValueType::Array, true)); for (auto const& vertexMessagePair : vertexMessageMap) { data.add(VPackValuePair(vertexMessagePair.first.data(), vertexMessagePair.first.size(), VPackValueType::String)); // key this->_format->addValue(data, vertexMessagePair.second); // value if (this->_sendToNextGSS) { this->_sendCountNextGSS++; } else { this->_sendCount++; } } data.close(); data.close(); // add a request ShardID const& shardId = this->_config->globalShardIDs()[shard]; network::RequestOptions reqOpts; reqOpts.timeout = network::Timeout(180); reqOpts.skipScheduler = true; responses.emplace_back(network::sendRequest(pool, "shard:" + shardId, fuerte::RestVerb::Post, this->_baseUrl + Utils::messagesPath, std::move(buffer), reqOpts)); } futures::collectAll(responses).wait(); _removeContainedMessages(); } // template types to create template class arangodb::pregel::OutCache; template class arangodb::pregel::OutCache; template class arangodb::pregel::OutCache; template class arangodb::pregel::OutCache; template class arangodb::pregel::ArrayOutCache; template class arangodb::pregel::ArrayOutCache; template class arangodb::pregel::ArrayOutCache; template class arangodb::pregel::ArrayOutCache; template class arangodb::pregel::CombiningOutCache; template class arangodb::pregel::CombiningOutCache; template class arangodb::pregel::CombiningOutCache; template class arangodb::pregel::CombiningOutCache; // algo specific template class arangodb::pregel::OutCache>; template class arangodb::pregel::ArrayOutCache>; template class arangodb::pregel::CombiningOutCache>; template class arangodb::pregel::OutCache>; template class arangodb::pregel::ArrayOutCache>; template class arangodb::pregel::CombiningOutCache>; template class arangodb::pregel::OutCache; template class arangodb::pregel::ArrayOutCache; template class arangodb::pregel::CombiningOutCache; template class arangodb::pregel::OutCache; template class arangodb::pregel::ArrayOutCache; template class arangodb::pregel::CombiningOutCache;