diff --git a/arangod/Pregel/Conductor.cpp b/arangod/Pregel/Conductor.cpp index 981decd950..ae4c27bafb 100644 --- a/arangod/Pregel/Conductor.cpp +++ b/arangod/Pregel/Conductor.cpp @@ -58,14 +58,14 @@ static IAggregatorCreator* resolveAlgorithm(std::string name, Conductor::Conductor( uint64_t executionNumber, TRI_vocbase_t* vocbase, std::vector> const& vertexCollections, - std::shared_ptr edgeCollection, + std::vector> const& edgeCollections, std::string const& algorithm) : _vocbaseGuard(vocbase), _executionNumber(executionNumber), _algorithm(algorithm), _state(ExecutionState::DEFAULT), _vertexCollections(vertexCollections), - _edgeCollection(edgeCollection) { + _edgeCollections(edgeCollections) { bool isCoordinator = ServerState::instance()->isCoordinator(); TRI_ASSERT(isCoordinator); LOG(INFO) << "constructed conductor"; @@ -88,7 +88,9 @@ static void printResults(std::vector const& requests) { } static void resolveShards(LogicalCollection const* collection, - std::map>& serverMap) { + std::map>> &serverMap) { + ClusterInfo* ci = ClusterInfo::instance(); std::shared_ptr> shardIDs = ci->getShardList(collection->cid_as_string()); @@ -97,7 +99,7 @@ static void resolveShards(LogicalCollection const* collection, std::shared_ptr> servers = ci->getResponsibleServer(shard); if (servers->size() > 0) { - serverMap[(*servers)[0]].push_back(shard); + serverMap[(*servers)[0]][collection->name()].push_back(shard); } } } @@ -115,34 +117,38 @@ void Conductor::start(VPackSlice userConfig) { _aggregatorUsage.reset(new AggregatorUsage(_agregatorCreator.get())); int64_t vertexCount = 0, edgeCount = 0; std::map collectionPlanIdMap; - std::map> edgeServerMap; + std::map>> vertexMap, edgeMap; + // resolve plan id's and shards on the servers for (auto &collection : _vertexCollections) { - collectionPlanIdMap[collection->name()] = collection->planId_as_string(); + collectionPlanIdMap.emplace(collection->name(), collection->planId_as_string()); int64_t cc = Utils::countDocuments(_vocbaseGuard.vocbase(), collection->name()); if (cc > 0) { vertexCount += cc; - resolveShards(collection.get(), _vertexServerMap); - } else { - LOG(WARN) << "Collection does not contain vertices"; + resolveShards(collection.get(), vertexMap); } } - edgeCount = - Utils::countDocuments(_vocbaseGuard.vocbase(), _edgeCollection->name()); - if (edgeCount > 0) { - resolveShards(_edgeCollection.get(), edgeServerMap); - } else { - LOG(WARN) << "Collection does not contain edges"; + for (auto &collection : _edgeCollections) { + collectionPlanIdMap.emplace(collection->name(), collection->planId_as_string()); + int64_t cc = + Utils::countDocuments(_vocbaseGuard.vocbase(), collection->name()); + if (cc > 0) { + edgeCount += cc; + resolveShards(collection.get(), edgeMap); + } } - + for (auto const& pair : vertexMap) { + _dbServers.push_back(pair.first); + } + std::string const baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name()); _globalSuperstep = 0; _state = ExecutionState::RUNNING; - _dbServerCount = _vertexServerMap.size(); + _dbServerCount = _dbServers.size(); _responseCount = 0; _doneCount = 0; - if (_vertexServerMap.size() != edgeServerMap.size()) { + if (vertexMap.size() != edgeMap.size()) { THROW_ARANGO_EXCEPTION_MESSAGE( TRI_ERROR_BAD_PARAMETER, "Vertex and edge collections are not sharded correctly"); @@ -151,7 +157,11 @@ void Conductor::start(VPackSlice userConfig) { std::string coordinatorId = ServerState::instance()->getId(); LOG(INFO) << "My id: " << coordinatorId; std::vector requests; - for (auto const& it : _vertexServerMap) { + for (auto const& it : vertexMap) { + ServerID const& server = it.first; + std::map> const& vertexShardMap = it.second; + std::map> const& edgeShardMap = edgeMap[it.first]; + VPackBuilder b; b.openObject(); b.add(Utils::executionNumberKey, VPackValue(_executionNumber)); @@ -161,18 +171,24 @@ void Conductor::start(VPackSlice userConfig) { b.add(Utils::coordinatorIdKey, VPackValue(coordinatorId)); b.add(Utils::totalVertexCount, VPackValue(vertexCount)); b.add(Utils::totalEdgeCount, VPackValue(edgeCount)); - b.add(Utils::vertexShardsListKey, VPackValue(VPackValueType::Array)); - for (ShardID const& vit : it.second) { - b.add(VPackValue(vit)); + b.add(Utils::vertexShardsKey, VPackValue(VPackValueType::Object)); + for (auto const& pair : vertexShardMap) { + b.add(pair.first, VPackValue(VPackValueType::Array)); + for (ShardID const& shard : pair.second) { + b.add(VPackValue(shard)); + } + b.close(); } b.close(); - b.add(Utils::edgeShardsListKey, VPackValue(VPackValueType::Array)); - for (ShardID const& eit : edgeServerMap[it.first]) { - b.add(VPackValue(eit)); + b.add(Utils::edgeShardsKey, VPackValue(VPackValueType::Object)); + for (auto const& pair : edgeShardMap) { + b.add(pair.first, VPackValue(VPackValueType::Array)); + for (ShardID const& shard : pair.second) { + b.add(VPackValue(shard)); + } + b.close(); } b.close(); - b.add(Utils::edgeCollectionPlanIdKey, - VPackValue(_edgeCollection->planId_as_string())); b.add(Utils::collectionPlanIdMapKey, VPackValue(VPackValueType::Object)); for (auto const& pair : collectionPlanIdMap) { b.add(pair.first, VPackValue(pair.second)); @@ -181,7 +197,7 @@ void Conductor::start(VPackSlice userConfig) { b.close(); auto body = std::make_shared(b.toJson()); - requests.emplace_back("server:" + it.first, rest::RequestType::POST, + requests.emplace_back("server:" + server, rest::RequestType::POST, baseUrl + Utils::startExecutionPath, body); } @@ -293,7 +309,7 @@ void Conductor::cancel() { _state = ExecutionState::CANCELED; } int Conductor::sendToAllDBServers(std::string path, VPackSlice const& config) { ClusterComm* cc = ClusterComm::instance(); - _dbServerCount = _vertexServerMap.size(); + _dbServerCount = _dbServers.size(); _responseCount = 0; _doneCount = 0; @@ -304,8 +320,8 @@ int Conductor::sendToAllDBServers(std::string path, VPackSlice const& config) { auto body = std::make_shared(config.toJson()); std::vector requests; - for (auto const& it : _vertexServerMap) { - requests.emplace_back("server:" + it.first, rest::RequestType::POST, path, + for (auto const& server : _dbServers) { + requests.emplace_back("server:" + server, rest::RequestType::POST, path, body); } diff --git a/arangod/Pregel/Conductor.h b/arangod/Pregel/Conductor.h index 7e6673965c..9754725236 100644 --- a/arangod/Pregel/Conductor.h +++ b/arangod/Pregel/Conductor.h @@ -46,12 +46,12 @@ class Conductor { const std::string _algorithm; ExecutionState _state; std::vector> _vertexCollections; - std::shared_ptr _edgeCollection; + std::vector> _edgeCollections; + std::vector _dbServers; // initialized on startup std::unique_ptr _agregatorCreator; std::unique_ptr _aggregatorUsage; - std::map> _vertexServerMap; uint64_t _globalSuperstep = 0; int32_t _dbServerCount = 0; @@ -69,7 +69,7 @@ class Conductor { public: Conductor(uint64_t executionNumber, TRI_vocbase_t* vocbase, std::vector> const& vertexCollections, - std::shared_ptr edgeCollection, + std::vector> const& edgeCollections, std::string const& algorithm); ~Conductor(); diff --git a/arangod/Pregel/GraphStore.cpp b/arangod/Pregel/GraphStore.cpp index 07cc373fec..ded0bd115b 100644 --- a/arangod/Pregel/GraphStore.cpp +++ b/arangod/Pregel/GraphStore.cpp @@ -28,7 +28,7 @@ #include "Indexes/Index.h" #include "Utils.h" #include "Utils/OperationCursor.h" -#include "Utils/SingleCollectionTransaction.h" +#include "Utils/ExplicitTransaction.h" #include "Utils/StandaloneTransactionContext.h" #include "Utils/Transaction.h" #include "VocBase/EdgeCollectionInfo.h" @@ -41,16 +41,49 @@ using namespace arangodb; using namespace arangodb::pregel; template -GraphStore::GraphStore(TRI_vocbase_t* vocbase, const WorkerState* state, +GraphStore::GraphStore(TRI_vocbase_t* vb, const WorkerState* state, GraphFormat* graphFormat) - : _vocbaseGuard(vocbase), _workerState(state), _graphFormat(graphFormat) { - _edgeCollection = ClusterInfo::instance()->getCollection( - vocbase->name(), state->edgeCollectionPlanId()); + : _vocbaseGuard(vb), _workerState(state), _graphFormat(graphFormat) { +// _edgeCollection = ClusterInfo::instance()->getCollection( +// vb->name(), state->edgeCollectionPlanId()); + std::vector readColls, writeColls; + for (auto const& pair : state->vertexCollectionShards()) { + for (auto const& shard : pair.second) { + readColls.push_back(shard); + } + } + for (auto const& pair : state->edgeCollectionShards()) { + for (auto const& shard : pair.second) { + readColls.push_back(shard); + } + } + double lockTimeout = + (double)(TRI_TRANSACTION_DEFAULT_LOCK_TIMEOUT / 1000000ULL); + _transaction = new ExplicitTransaction(StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()), + readColls, writeColls, + lockTimeout, false, false); + int res = _transaction->begin(); + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } - for (auto& shard : state->localVertexShardIDs()) { - loadVertices(shard); + std::map> const& vertexMap = state->vertexCollectionShards(); + std::map> const& edgeMap = state->edgeCollectionShards(); + for (auto const& pair : vertexMap) { + std::vector const& vertexShards = pair.second; + for (size_t i = 0; i < vertexShards.size(); i++) { + // distributeshardslike should cause the edges for a vertex to be + // in the same shard index. x in vertexShard2 => E(x) in edgeShard2 + for (auto const& pair2 : edgeMap) { + std::vector const& edgeShards = pair2.second; + TRI_ASSERT(vertexShards.size() == edgeShards.size()); + loadVertices(vertexShards[i], edgeShards[i]); + } + } } cleanupTransactions(); + + LOG(INFO) << "Loaded " << _index.size() << "vertices and " << _edges.size() << " edges"; } template @@ -88,7 +121,7 @@ RangeIterator> GraphStore::edgeIterator(VertexEntry const* en } - +/* template SingleCollectionTransaction* GraphStore::readTransaction(ShardID const& shard) { auto it = _transactions.find(shard); @@ -106,11 +139,11 @@ SingleCollectionTransaction* GraphStore::readTransaction(ShardID const& sh _transactions[shard] = trx.get(); return trx.release(); } -} +}*/ template void GraphStore::cleanupTransactions() { - for (auto const& it : _transactions) { // clean transactions + /*for (auto const& it : _transactions) { // clean transactions if (it.second->getStatus() == TRI_TRANSACTION_RUNNING) { if (it.second->commit() != TRI_ERROR_NO_ERROR) { LOG(WARN) << "Pregel worker: Failed to commit on a read transaction"; @@ -118,36 +151,35 @@ void GraphStore::cleanupTransactions() { } delete (it.second); } - _transactions.clear(); + _transactions.clear();*/ + if (_transaction) { + if (_transaction->getStatus() == TRI_TRANSACTION_RUNNING) { + if (_transaction->commit() != TRI_ERROR_NO_ERROR) { + LOG(WARN) << "Pregel worker: Failed to commit on a read transaction"; + } + } + delete _transaction; + _transaction = nullptr; + } } template -void GraphStore::loadVertices(ShardID const& vertexShard) { +void GraphStore::loadVertices(ShardID const& vertexShard, ShardID const& edgeShard) { //_graphFormat->willUseCollection(vocbase, vertexShard, false); bool storeData = _graphFormat->storesVertexData(); - SingleCollectionTransaction trx( - StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()), - vertexShard, TRI_TRANSACTION_READ); + TRI_voc_cid_t cid = _transaction->addCollectionAtRuntime(vertexShard); + _transaction->orderDitch(cid); // will throw when it fails - int res = trx.begin(); + /*int res = _transaction->lockRead(); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'", vertexShard.c_str()); - } + }*/ - TRI_voc_cid_t cid = trx.addCollectionAtRuntime(vertexShard); - trx.orderDitch(cid); // will throw when it fails - - res = trx.lockRead(); - if (res != TRI_ERROR_NO_ERROR) { - THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'", - vertexShard.c_str()); - } - - ManagedDocumentResult mmdr(&trx); - std::unique_ptr cursor = trx.indexScan( + ManagedDocumentResult mmdr(_transaction); + std::unique_ptr cursor = _transaction->indexScan( vertexShard, Transaction::CursorType::ALL, Transaction::IndexHandle(), {}, &mmdr, 0, UINT64_MAX, 1000, false); @@ -164,7 +196,7 @@ void GraphStore::loadVertices(ShardID const& vertexShard) { cursor->getMoreMptr(result, 1000); for (auto const& element : result) { TRI_voc_rid_t revisionId = element.revisionId(); - if (collection->readRevision(&trx, mmdr, revisionId)) { + if (collection->readRevision(_transaction, mmdr, revisionId)) { VPackSlice document(mmdr.vpack()); if (document.isExternal()) { document = document.resolveExternal(); @@ -172,7 +204,7 @@ void GraphStore::loadVertices(ShardID const& vertexShard) { LOG(INFO) << "Loaded Vertex: " << document.toJson(); - std::string vertexId = trx.extractIdString(document); + std::string vertexId = _transaction->extractIdString(document); VertexEntry entry(vertexId); if (storeData) { V vertexData; @@ -185,42 +217,44 @@ void GraphStore::loadVertices(ShardID const& vertexShard) { LOG(ERR) << "Could not load vertex " << document.toJson(); } } - loadEdges(entry); + + loadEdges(edgeShard, entry); _index.push_back(entry); } } } - res = trx.unlockRead(); + /*res = trx.unlockRead(); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up shard '%s'", vertexShard.c_str()); } - //_shardsPlanIdMap[vertexShard] = - // trx.documentCollection()->planId_as_string(); + res = trx.finish(res); if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'", vertexShard.c_str()); - } + }//*/ } template -void GraphStore::loadEdges(VertexEntry& vertexEntry) { +void GraphStore::loadEdges(ShardID const& shard, VertexEntry& vertexEntry) { //_graphFormat->willUseCollection(vocbase, edgeShard, true); const bool storeData = _graphFormat->storesEdgeData(); std::string const& _from = vertexEntry.vertexID(); const std::string _key = Utils::vertexKeyFromToValue(_from); - ShardID shard; - Utils::resolveShard(_edgeCollection.get(), Utils::edgeShardingKey, _key, - shard); + - SingleCollectionTransaction* trx = readTransaction(shard); - traverser::EdgeCollectionInfo info(trx, shard, TRI_EDGE_OUT, + /*ShardID shard; + Utils::resolveShard(_edgeCollection.get(), Utils::edgeShardingKey, + _key, shard);*/ + + //Transaction* trx = readTransaction(shard); + traverser::EdgeCollectionInfo info(_transaction, shard, TRI_EDGE_OUT, StaticStrings::FromString, 0); - ManagedDocumentResult mmdr(trx); + ManagedDocumentResult mmdr(_transaction); auto cursor = info.getEdges(_from, &mmdr); if (cursor->failed()) { THROW_ARANGO_EXCEPTION_FORMAT(cursor->code, @@ -240,7 +274,7 @@ void GraphStore::loadEdges(VertexEntry& vertexEntry) { cursor->getMoreMptr(result, 1000); for (auto const& element : result) { TRI_voc_rid_t revisionId = element.revisionId(); - if (collection->readRevision(trx, mmdr, revisionId)) { + if (collection->readRevision(_transaction, mmdr, revisionId)) { VPackSlice document(mmdr.vpack()); if (document.isExternal()) { document = document.resolveExternal(); @@ -276,7 +310,7 @@ void GraphStore::loadEdges(VertexEntry& vertexEntry) { }*/ } -template +/*template SingleCollectionTransaction* GraphStore::writeTransaction(ShardID const& shard) { auto it = _transactions.find(shard); @@ -295,11 +329,28 @@ SingleCollectionTransaction* GraphStore::writeTransaction(ShardID const& s _transactions[shard] = trx.get(); return trx.release(); } -} +}*/ template void GraphStore::storeResults() { + std::vector readColls, writeColls; + for (auto shard : _workerState->localVertexShardIDs() ) { + writeColls.push_back(shard); + } + //for (auto shard : _workerState->localEdgeShardIDs() ) { + // writeColls(shard); + //} + double lockTimeout = + (double)(TRI_TRANSACTION_DEFAULT_LOCK_TIMEOUT / 1000000ULL); + _transaction = new ExplicitTransaction(StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()), + readColls, writeColls, + lockTimeout, false, false); + int res = _transaction->begin(); + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + OperationOptions options; for (auto& vertexEntry : _index) { @@ -316,7 +367,6 @@ void GraphStore::storeResults() { std::string shard; Utils::resolveShard(collInfo.get(), StaticStrings::KeyString, _key, shard); - SingleCollectionTransaction* trx = writeTransaction(shard); void* data = mutableVertexData(&vertexEntry); VPackBuilder b; @@ -325,7 +375,7 @@ void GraphStore::storeResults() { _graphFormat->buildVertexDocument(b, data, sizeof(V)); b.close(); - OperationResult result = trx->update(shard, b.slice(), options); + OperationResult result = _transaction->update(shard, b.slice(), options); if (result.code != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION(result.code); } diff --git a/arangod/Pregel/GraphStore.h b/arangod/Pregel/GraphStore.h index 11645a3795..3606bcb900 100644 --- a/arangod/Pregel/GraphStore.h +++ b/arangod/Pregel/GraphStore.h @@ -26,12 +26,13 @@ #include #include #include "Cluster/ClusterInfo.h" +#include "VocBase/voc-types.h" #include "GraphFormat.h" struct TRI_vocbase_t; namespace arangodb { -class SingleCollectionTransaction; +class Transaction; class LogicalCollection; namespace pregel { @@ -43,6 +44,8 @@ struct EdgeEntry { // size_t _nextEntryOffset; // size_t _dataSize; std::string _toVertexID; + TRI_voc_cid_t sourceShardID; + TRI_voc_cid_t targetShardID; E _data; // size_t _vertexIDSize; // char _vertexID[1]; @@ -246,15 +249,14 @@ class GraphStore { VocbaseGuard _vocbaseGuard; const WorkerState* _workerState; const std::unique_ptr> _graphFormat; - std::unordered_map _transactions; - std::shared_ptr _edgeCollection; - SingleCollectionTransaction* readTransaction(ShardID const& shard); - SingleCollectionTransaction* writeTransaction(ShardID const& shard); + Transaction *_transaction; + //SingleCollectionTransaction* readTransaction(ShardID const& shard); + //SingleCollectionTransaction* writeTransaction(ShardID const& shard); void cleanupTransactions(); - void loadVertices(ShardID const& vertexShard); - void loadEdges(VertexEntry& entry); + void loadVertices(ShardID const& vertexShard, ShardID const& edgeShard); + void loadEdges(ShardID const& edgeShard, VertexEntry& entry); public: GraphStore(TRI_vocbase_t* vocbase, const WorkerState* state, diff --git a/arangod/Pregel/IncomingCache.h b/arangod/Pregel/IncomingCache.h index 39637c2b33..8707d8ed5d 100644 --- a/arangod/Pregel/IncomingCache.h +++ b/arangod/Pregel/IncomingCache.h @@ -44,7 +44,7 @@ template class IncomingCache { public: IncomingCache(MessageFormat const* format, MessageCombiner const* combiner) - : _format(format), _combiner(combiner), _receivedMessageCount(0) {} + : _receivedMessageCount(0), _format(format), _combiner(combiner) {} ~IncomingCache(); void parseMessages(VPackSlice messages); diff --git a/arangod/Pregel/README.md b/arangod/Pregel/README.md index 9645227dc0..f59973571b 100644 --- a/arangod/Pregel/README.md +++ b/arangod/Pregel/README.md @@ -27,10 +27,7 @@ In arangosh: db._create('vertices', {numberOfShards: 2}); db._createEdgeCollection('alt_edges'); - db._createEdgeCollection('edges', {numberOfShards: 2, - shardKeys:["_vertex"], - distributeShardsLike:'vertices' - }); + db._createEdgeCollection('edges', {numberOfShards: 2, shardKeys:["_vertex"], distributeShardsLike:'vertices'}); arangoimp --file generated_vertices.csv --type csv --collection vertices --overwrite true --server.endpoint http+tcp://127.0.0.1:8530 diff --git a/arangod/Pregel/Utils.cpp b/arangod/Pregel/Utils.cpp index 88a22256c2..e6ecb38728 100644 --- a/arangod/Pregel/Utils.cpp +++ b/arangod/Pregel/Utils.cpp @@ -45,9 +45,8 @@ std::string const Utils::finalizeExecutionPath = "finalizeExecution"; std::string const Utils::executionNumberKey = "exn"; std::string const Utils::collectionPlanIdMapKey = "collectionPlanIdMap"; -std::string const Utils::edgeCollectionPlanIdKey = "edgePlanId"; -std::string const Utils::vertexShardsListKey = "vertexShards"; -std::string const Utils::edgeShardsListKey = "edgeShards"; +std::string const Utils::vertexShardsKey = "vertexShards"; +std::string const Utils::edgeShardsKey = "edgeShards"; std::string const Utils::coordinatorIdKey = "coordinatorId"; std::string const Utils::algorithmKey = "algorithm"; diff --git a/arangod/Pregel/Utils.h b/arangod/Pregel/Utils.h index f39267a0fd..e0743a9504 100644 --- a/arangod/Pregel/Utils.h +++ b/arangod/Pregel/Utils.h @@ -51,9 +51,8 @@ class Utils { static std::string const algorithmKey; static std::string const coordinatorIdKey; static std::string const collectionPlanIdMapKey; - static std::string const edgeCollectionPlanIdKey; - static std::string const vertexShardsListKey; - static std::string const edgeShardsListKey; + static std::string const vertexShardsKey; + static std::string const edgeShardsKey; static std::string const globalSuperstepKey; static std::string const messagesKey; diff --git a/arangod/Pregel/WorkerState.cpp b/arangod/Pregel/WorkerState.cpp index a1521d640f..db09091ab4 100644 --- a/arangod/Pregel/WorkerState.cpp +++ b/arangod/Pregel/WorkerState.cpp @@ -31,14 +31,13 @@ using namespace arangodb::pregel; WorkerState::WorkerState(DatabaseID dbname, VPackSlice params) : _database(dbname) { VPackSlice coordID = params.get(Utils::coordinatorIdKey); - VPackSlice vertexShardIDs = params.get(Utils::vertexShardsListKey); - VPackSlice edgeShardIDs = params.get(Utils::edgeShardsListKey); + VPackSlice vertexShardMap = params.get(Utils::vertexShardsKey); + VPackSlice edgeShardMap = params.get(Utils::edgeShardsKey); VPackSlice execNum = params.get(Utils::executionNumberKey); VPackSlice collectionPlanIdMap = params.get(Utils::collectionPlanIdMapKey); - VPackSlice edgePlanID = params.get(Utils::edgeCollectionPlanIdKey); - if (!coordID.isString() || !vertexShardIDs.isArray() || - !edgeShardIDs.isArray() || !execNum.isInteger() || - !collectionPlanIdMap.isObject() || !edgePlanID.isString()) { + if (!coordID.isString() || !edgeShardMap.isObject() || + !vertexShardMap.isObject() || !execNum.isInteger() || + !collectionPlanIdMap.isObject()) { THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, "Supplied bad parameters to worker"); } @@ -47,22 +46,25 @@ WorkerState::WorkerState(DatabaseID dbname, VPackSlice params) //_vertexCollectionName = vertexCollName.copyString(); //_vertexCollectionPlanId = vertexCollPlanId.copyString(); - LOG(INFO) << "Local Shards:"; - VPackArrayIterator vertices(vertexShardIDs); - for (VPackSlice shardSlice : vertices) { - ShardID name = shardSlice.copyString(); - _localVertexShardIDs.push_back(name); - LOG(INFO) << name; + for (auto const& pair : VPackObjectIterator(vertexShardMap)) { + std::vector shards; + for (VPackSlice shardSlice : VPackArrayIterator(pair.value)) { + ShardID shard = shardSlice.copyString(); + shards.push_back(shard); + _localVertexShardIDs.push_back(shard); + } + _vertexCollectionShards.emplace(pair.key.copyString(), shards); } - VPackArrayIterator edges(edgeShardIDs); - for (VPackSlice shardSlice : edges) { - ShardID name = shardSlice.copyString(); - _localEdgeShardIDs.push_back(name); - LOG(INFO) << name; + + for (auto const& pair : VPackObjectIterator(edgeShardMap)) { + std::vector shards; + for (VPackSlice shardSlice : VPackArrayIterator(pair.value)) { + shards.push_back(shardSlice.copyString()); + } + _edgeCollectionShards.emplace(pair.key.copyString(), shards); } for (auto const& it : VPackObjectIterator(collectionPlanIdMap)) { _collectionPlanIdMap.emplace(it.key.copyString(), it.value.copyString()); } - _edgeCollectionPlanId = edgePlanID.copyString(); } diff --git a/arangod/Pregel/WorkerState.h b/arangod/Pregel/WorkerState.h index 001f1d8461..148e16845b 100644 --- a/arangod/Pregel/WorkerState.h +++ b/arangod/Pregel/WorkerState.h @@ -52,21 +52,21 @@ class WorkerState { inline std::string const& database() const { return _database; } - inline std::vector const& localVertexShardIDs() const { - return _localVertexShardIDs; + inline std::map> const& vertexCollectionShards() const { + return _vertexCollectionShards; } - inline std::vector const& localEdgeShardIDs() const { - return _localEdgeShardIDs; + inline std::map> const& edgeCollectionShards() const { + return _edgeCollectionShards; } - std::map const& collectionPlanIdMap() const { + inline std::map const& collectionPlanIdMap() const { return _collectionPlanIdMap; }; - - std::string const& edgeCollectionPlanId() const { - return _edgeCollectionPlanId; - } + + inline std::vector const& localVertexShardIDs() const { + return _localVertexShardIDs; + }; // inline uint64_t numWorkerThreads() { // return _numWorkerThreads; @@ -79,9 +79,10 @@ class WorkerState { std::string _coordinatorId; const std::string _database; - std::vector _localVertexShardIDs, _localEdgeShardIDs; + std::vector _localVertexShardIDs; + std::map> _vertexCollectionShards, _edgeCollectionShards; + std::map _collectionPlanIdMap; - std::string _edgeCollectionPlanId; }; } } diff --git a/arangod/Utils/ExplicitTransaction.h b/arangod/Utils/ExplicitTransaction.h index c3470b865c..7a365f9969 100644 --- a/arangod/Utils/ExplicitTransaction.h +++ b/arangod/Utils/ExplicitTransaction.h @@ -27,7 +27,7 @@ #include "Basics/Common.h" #include "Utils/Transaction.h" -#include "Utils/V8TransactionContext.h" +#include "Utils/TransactionContext.h" #include "VocBase/ticks.h" #include "VocBase/transaction.h" @@ -39,7 +39,7 @@ class ExplicitTransaction : public Transaction { /// @brief create the transaction ////////////////////////////////////////////////////////////////////////////// - ExplicitTransaction(std::shared_ptr transactionContext, + ExplicitTransaction(std::shared_ptr transactionContext, std::vector const& readCollections, std::vector const& writeCollections, double lockTimeout, bool waitForSync, diff --git a/arangod/V8Server/v8-collection.cpp b/arangod/V8Server/v8-collection.cpp index 54deecc008..02e7ba3134 100644 --- a/arangod/V8Server/v8-collection.cpp +++ b/arangod/V8Server/v8-collection.cpp @@ -1848,16 +1848,20 @@ static void JS_PregelStart(v8::FunctionCallbackInfo const& args) { "_pregelStart(, , [, " "{steps:100, ...}]"); } - std::vector vertices; - if (args[0]->IsArray()) { - v8::Handle array = v8::Handle::Cast(args[0]); + auto parse = [](v8::Local const& value, std::vector &out) { + v8::Handle array = v8::Handle::Cast(value); uint32_t const n = array->Length(); for (uint32_t i = 0; i < n; ++i) { v8::Handle obj = array->Get(i); if (obj->IsString()) { - vertices.push_back(TRI_ObjectToString(obj)); + out.push_back(TRI_ObjectToString(obj)); } } + }; + + std::vector vertices, edges; + if (args[0]->IsArray()) { + parse(args[0], vertices); } else if (args[0]->IsString()) { vertices.push_back(TRI_ObjectToString(args[0])); } else { @@ -1866,10 +1870,16 @@ static void JS_PregelStart(v8::FunctionCallbackInfo const& args) { if (vertices.size() == 0) { TRI_V8_THROW_EXCEPTION_USAGE("Specify at least one vertex collection"); } - if (!args[1]->IsString()) { - TRI_V8_THROW_EXCEPTION_USAGE("Specify an edge collection to use"); + if (args[1]->IsArray()) { + parse(args[1], edges); + } else if (args[1]->IsString()) { + edges.push_back(TRI_ObjectToString(args[1])); + } else { + TRI_V8_THROW_EXCEPTION_USAGE("Specify an array of edge collections (or a string)"); + } + if (edges.size() == 0) { + TRI_V8_THROW_EXCEPTION_USAGE("Specify at least one edge collection"); } - std::string edgeCName(TRI_ObjectToString(args[1])); std::string algorithm = TRI_ObjectToString(args[2]); VPackBuilder paramBuilder; if (argLength >= 4 && args[3]->IsObject()) { @@ -1879,14 +1889,11 @@ static void JS_PregelStart(v8::FunctionCallbackInfo const& args) { } } - LOG(INFO) << "Called _pregelStart(" << vertices[0] << "," << edgeCName << ")"; - if (ServerState::instance()->isCoordinator()) { LOG(INFO) << "Called as a controller"; TRI_vocbase_t* vocbase = GetContextVocBase(isolate); - std::vector> vColls; - std::shared_ptr edgeCollection; + std::vector> vColls, eColls; try { for (std::string const& name : vertices) { auto coll = @@ -1901,17 +1908,20 @@ static void JS_PregelStart(v8::FunctionCallbackInfo const& args) { } vColls.push_back(coll); } - edgeCollection = - ClusterInfo::instance()->getCollection(vocbase->name(), edgeCName); - if (edgeCollection->isSystem()) { - TRI_V8_THROW_EXCEPTION_USAGE( - "Cannot use pregel on system collection"); - } - std::vector eKeys = edgeCollection->shardKeys(); - if (eKeys.size() != 1 || eKeys[0] != "_vertex") { + for (std::string const& name : edges) { + auto coll = + ClusterInfo::instance()->getCollection(vocbase->name(), name); + if (coll->isSystem()) { + TRI_V8_THROW_EXCEPTION_USAGE( + "Cannot use pregel on system collection"); + } + std::vector eKeys = coll->shardKeys(); + if (eKeys.size() != 1 || eKeys[0] != "_vertex") { TRI_V8_THROW_EXCEPTION_USAGE( "Edge collection needs to be sharded after '_vertex', or use " "smart graphs"); + } + eColls.push_back(coll); } } catch (...) { TRI_V8_THROW_EXCEPTION_USAGE("Collections do not exist"); @@ -1921,7 +1931,7 @@ static void JS_PregelStart(v8::FunctionCallbackInfo const& args) { pregel::Conductor* c = new pregel::Conductor(en, vocbase, vColls, - edgeCollection, + eColls, algorithm); pregel::PregelFeature::instance()->addExecution(c, en); c->start(paramBuilder.slice());