1
0
Fork 0

Forward port some changes (#8949)

This commit is contained in:
Simon 2019-05-09 19:42:06 +02:00 committed by Jan
parent 162bcd205c
commit 6075fec35e
6 changed files with 45 additions and 7 deletions

View File

@ -178,6 +178,7 @@ std::map<CollectionID, std::vector<VertexShardInfo>> GraphStore<V, E>::_allocate
// eCount * _graphFormat->estimatedEdgeSize();
// if (!_config->lazyLoading() &&
// (_config->useMemoryMaps() || requiredMem > totalMemory / 2)) {
// LOG_TOPIC(DEBUG, Logger::PREGEL) << "Using memory mapped storage";
// if (_graphFormat->estimatedVertexSize() > 0) {
// _vertexData = new MappedFileBuffer<V>(vCount);
// }

View File

@ -109,7 +109,7 @@ int Utils::resolveShard(WorkerConfig const* config, std::string const& collectio
std::shared_ptr<LogicalCollection> info;
auto const& it = planIDMap.find(collectionName);
if (it != planIDMap.end()) {
info = ci->getCollection(config->database(), it->second); // might throw
info = ci->getCollectionNT(config->database(), it->second); // might throw
if (info == nullptr) {
return TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND;
}

View File

@ -622,15 +622,33 @@ template <typename V, typename E, typename M>
void Worker<V, E, M>::aqlResult(VPackBuilder& b) const {
MUTEX_LOCKER(guard, _commandMutex);
TRI_ASSERT(b.isEmpty());
// std::vector<ShardID> const& shards = _config.globalShardIDs();
std::string tmp;
b.openArray();
b.openArray(/*unindexed*/true);
auto it = _graphStore->vertexIterator();
for (VertexEntry const* vertexEntry : it) {
V* data = _graphStore->mutableVertexData(vertexEntry);
b.openObject();
TRI_ASSERT(vertexEntry->shard() < _config.globalShardIDs().size());
ShardID const& shardId = _config.globalShardIDs()[vertexEntry->shard()];
b.openObject(/*unindexed*/true);
std::string const& cname = _config.shardIDToCollectionName(shardId);
if (!cname.empty()) {
tmp.clear();
tmp.append(cname);
tmp.push_back('/');
tmp.append(vertexEntry->key());
b.add(StaticStrings::IdString, VPackValue(tmp));
}
b.add(StaticStrings::KeyString, VPackValuePair(vertexEntry->key().data(),
vertexEntry->key().size(),
VPackValueType::String));
V* data = _graphStore->mutableVertexData(vertexEntry);
// bool store =
_graphStore->graphFormat()->buildVertexDocument(b, data, sizeof(V));
b.close();

View File

@ -84,6 +84,8 @@ void WorkerConfig::updateConfig(VPackSlice params) {
// every have
// edges in the third edge shard. This should speed up the startup
for (auto const& pair : VPackObjectIterator(vertexShardMap)) {
CollectionID cname = pair.key.copyString();
std::vector<ShardID> shards;
for (VPackSlice shardSlice : VPackArrayIterator(pair.value)) {
ShardID shard = shardSlice.copyString();
@ -91,20 +93,24 @@ void WorkerConfig::updateConfig(VPackSlice params) {
_localVertexShardIDs.push_back(shard);
_localPregelShardIDs.insert(_pregelShardIDs[shard]);
_localPShardIDs_hash.insert(_pregelShardIDs[shard]);
_shardToCollectionName.emplace(shard, cname);
}
_vertexCollectionShards.emplace(pair.key.copyString(), shards);
_vertexCollectionShards.emplace(cname, shards);
}
// Ordered list of edge shards for each collection
for (auto const& pair : VPackObjectIterator(edgeShardMap)) {
CollectionID cname = pair.key.copyString();
std::vector<ShardID> shards;
for (VPackSlice shardSlice : VPackArrayIterator(pair.value)) {
ShardID shard = shardSlice.copyString();
shards.push_back(shard);
_localEdgeShardIDs.push_back(shard);
_shardToCollectionName.emplace(shard, cname);
}
_edgeCollectionShards.emplace(pair.key.copyString(), shards);
}
_edgeCollectionShards.emplace(cname, shards);
}
}
PregelID WorkerConfig::documentIdToPregel(std::string const& documentID) const {

View File

@ -81,6 +81,14 @@ class WorkerConfig {
return _collectionPlanIdMap;
};
std::string const& shardIDToCollectionName(ShardID const& shard) const {
auto const& it = _shardToCollectionName.find(shard);
if (it != _shardToCollectionName.end()) {
return it->second;
}
return StaticStrings::Empty;
}
// same content on every worker, has to stay equal!!!!
inline std::vector<ShardID> const& globalShardIDs() const {
return _globalShardIDs;
@ -137,6 +145,8 @@ class WorkerConfig {
std::vector<ShardID> _localVertexShardIDs, _localEdgeShardIDs;
std::unordered_map<std::string, std::string> _collectionPlanIdMap;
std::map<ShardID, std::string> _shardToCollectionName;
// Map from edge collection to their shards, only iterated over keep sorted
std::map<CollectionID, std::vector<ShardID>> _vertexCollectionShards, _edgeCollectionShards;

View File

@ -185,6 +185,9 @@ function basicTestSuite() {
let v = vertices.document(d._key);
assertTrue(v !== null);
assertTrue(Math.abs(v.pagerank - d.result) < EPS);
let v2 = db._document(d._id);
assertEqual(v, v2);
});
break;
}