diff --git a/arangod/Pregel/GraphStore.cpp b/arangod/Pregel/GraphStore.cpp index b69d8a9ae6..7d80b8f8ea 100644 --- a/arangod/Pregel/GraphStore.cpp +++ b/arangod/Pregel/GraphStore.cpp @@ -172,21 +172,21 @@ std::map> GraphStore::_allocate LOG_TOPIC(DEBUG, Logger::PREGEL) << "Estimating #numEdges: " << eCount; _index.resize(vCount); - size_t requiredMem = vCount * _graphFormat->estimatedVertexSize() + - eCount * _graphFormat->estimatedEdgeSize(); - if (!_config->lazyLoading() && - (_config->useMemoryMaps() || requiredMem > totalMemory / 2)) { - LOG_TOPIC(DEBUG, Logger::PREGEL) << "Using memory mapped storage"; - if (_graphFormat->estimatedVertexSize() > 0) { - _vertexData = new MappedFileBuffer(vCount); - } - _edges = new MappedFileBuffer>(eCount); - } else { +// size_t requiredMem = vCount * _graphFormat->estimatedVertexSize() + +// eCount * _graphFormat->estimatedEdgeSize(); +// if (!_config->lazyLoading() && +// (_config->useMemoryMaps() || requiredMem > totalMemory / 2)) { +// LOG_TOPIC(DEBUG, Logger::PREGEL) << "Using memory mapped storage"; +// if (_graphFormat->estimatedVertexSize() > 0) { +// _vertexData = new MappedFileBuffer(vCount); +// } +// _edges = new MappedFileBuffer>(eCount); +// } else { if (_graphFormat->estimatedVertexSize() > 0) { _vertexData = new VectorTypedBuffer(vCount); } _edges = new VectorTypedBuffer>(eCount); - } +// } return result; } diff --git a/arangod/Pregel/Utils.cpp b/arangod/Pregel/Utils.cpp index 6946ea188f..a7932a7fd5 100644 --- a/arangod/Pregel/Utils.cpp +++ b/arangod/Pregel/Utils.cpp @@ -109,7 +109,7 @@ int Utils::resolveShard(WorkerConfig const* config, std::string const& collectio std::shared_ptr info; auto const& it = planIDMap.find(collectionName); if (it != planIDMap.end()) { - info = ci->getCollection(config->database(), it->second); // might throw + info = ci->getCollectionNT(config->database(), it->second); // might throw if (info == nullptr) { return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND; } diff --git a/arangod/Pregel/Worker.cpp b/arangod/Pregel/Worker.cpp index c3216fbbbd..952b64e7bf 100644 --- a/arangod/Pregel/Worker.cpp +++ b/arangod/Pregel/Worker.cpp @@ -623,15 +623,33 @@ template void Worker::aqlResult(VPackBuilder& b) const { MUTEX_LOCKER(guard, _commandMutex); TRI_ASSERT(b.isEmpty()); + +// std::vector const& shards = _config.globalShardIDs(); + std::string tmp; - b.openArray(); + b.openArray(/*unindexed*/true); auto it = _graphStore->vertexIterator(); for (VertexEntry const* vertexEntry : it) { - V* data = _graphStore->mutableVertexData(vertexEntry); - b.openObject(); + + TRI_ASSERT(vertexEntry->shard() < _config.globalShardIDs().size()); + ShardID const& shardId = _config.globalShardIDs()[vertexEntry->shard()]; + + b.openObject(/*unindexed*/true); + + std::string const& cname = _config.shardIDToCollectionName(shardId); + if (!cname.empty()) { + tmp.clear(); + tmp.append(cname); + tmp.push_back('/'); + tmp.append(vertexEntry->key()); + b.add(StaticStrings::IdString, VPackValue(tmp)); + } + b.add(StaticStrings::KeyString, VPackValuePair(vertexEntry->key().data(), vertexEntry->key().size(), VPackValueType::String)); + + V* data = _graphStore->mutableVertexData(vertexEntry); // bool store = _graphStore->graphFormat()->buildVertexDocument(b, data, sizeof(V)); b.close(); diff --git a/arangod/Pregel/WorkerConfig.cpp b/arangod/Pregel/WorkerConfig.cpp index a7a1f0d97a..15f099d056 100644 --- a/arangod/Pregel/WorkerConfig.cpp +++ b/arangod/Pregel/WorkerConfig.cpp @@ -84,6 +84,8 @@ void WorkerConfig::updateConfig(VPackSlice params) { // every have // edges in the third edge shard. This should speed up the startup for (auto const& pair : VPackObjectIterator(vertexShardMap)) { + CollectionID cname = pair.key.copyString(); + std::vector shards; for (VPackSlice shardSlice : VPackArrayIterator(pair.value)) { ShardID shard = shardSlice.copyString(); @@ -91,20 +93,24 @@ void WorkerConfig::updateConfig(VPackSlice params) { _localVertexShardIDs.push_back(shard); _localPregelShardIDs.insert(_pregelShardIDs[shard]); _localPShardIDs_hash.insert(_pregelShardIDs[shard]); + _shardToCollectionName.emplace(shard, cname); } - _vertexCollectionShards.emplace(pair.key.copyString(), shards); + _vertexCollectionShards.emplace(cname, shards); } // Ordered list of edge shards for each collection for (auto const& pair : VPackObjectIterator(edgeShardMap)) { + CollectionID cname = pair.key.copyString(); + std::vector shards; for (VPackSlice shardSlice : VPackArrayIterator(pair.value)) { ShardID shard = shardSlice.copyString(); shards.push_back(shard); _localEdgeShardIDs.push_back(shard); + _shardToCollectionName.emplace(shard, cname); } - _edgeCollectionShards.emplace(pair.key.copyString(), shards); - } + _edgeCollectionShards.emplace(cname, shards); + } } PregelID WorkerConfig::documentIdToPregel(std::string const& documentID) const { diff --git a/arangod/Pregel/WorkerConfig.h b/arangod/Pregel/WorkerConfig.h index eee5178e94..d9a6f7bcf1 100644 --- a/arangod/Pregel/WorkerConfig.h +++ b/arangod/Pregel/WorkerConfig.h @@ -81,6 +81,14 @@ class WorkerConfig { return _collectionPlanIdMap; }; + std::string const& shardIDToCollectionName(ShardID const& shard) const { + auto const& it = _shardToCollectionName.find(shard); + if (it != _shardToCollectionName.end()) { + return it->second; + } + return StaticStrings::Empty; + } + // same content on every worker, has to stay equal!!!! inline std::vector const& globalShardIDs() const { return _globalShardIDs; @@ -137,6 +145,8 @@ class WorkerConfig { std::vector _localVertexShardIDs, _localEdgeShardIDs; std::unordered_map _collectionPlanIdMap; + std::map _shardToCollectionName; + // Map from edge collection to their shards, only iterated over keep sorted std::map> _vertexCollectionShards, _edgeCollectionShards; diff --git a/tests/js/server/shell/shell-pregel.js b/tests/js/server/shell/shell-pregel.js index f7f1dde7a9..332acfd4a7 100644 --- a/tests/js/server/shell/shell-pregel.js +++ b/tests/js/server/shell/shell-pregel.js @@ -185,6 +185,9 @@ function basicTestSuite() { let v = vertices.document(d._key); assertTrue(v !== null); assertTrue(Math.abs(v.pagerank - d.result) < EPS); + + let v2 = db._document(d._id); + assertEqual(v, v2); }); break; }