1
0
Fork 0

Multiple edge collections

This commit is contained in:
Simon Grätzer 2016-11-18 22:02:13 +01:00
parent 9b1a11e68f
commit 6c6c2a092d
12 changed files with 226 additions and 150 deletions

View File

@ -58,14 +58,14 @@ static IAggregatorCreator* resolveAlgorithm(std::string name,
Conductor::Conductor(
uint64_t executionNumber, TRI_vocbase_t* vocbase,
std::vector<std::shared_ptr<LogicalCollection>> const& vertexCollections,
std::shared_ptr<LogicalCollection> edgeCollection,
std::vector<std::shared_ptr<LogicalCollection>> const& edgeCollections,
std::string const& algorithm)
: _vocbaseGuard(vocbase),
_executionNumber(executionNumber),
_algorithm(algorithm),
_state(ExecutionState::DEFAULT),
_vertexCollections(vertexCollections),
_edgeCollection(edgeCollection) {
_edgeCollections(edgeCollections) {
bool isCoordinator = ServerState::instance()->isCoordinator();
TRI_ASSERT(isCoordinator);
LOG(INFO) << "constructed conductor";
@ -88,7 +88,9 @@ static void printResults(std::vector<ClusterCommRequest> const& requests) {
}
static void resolveShards(LogicalCollection const* collection,
std::map<ServerID, std::vector<ShardID>>& serverMap) {
std::map<ServerID, std::map<CollectionID,
std::vector<ShardID>>> &serverMap) {
ClusterInfo* ci = ClusterInfo::instance();
std::shared_ptr<std::vector<ShardID>> shardIDs =
ci->getShardList(collection->cid_as_string());
@ -97,7 +99,7 @@ static void resolveShards(LogicalCollection const* collection,
std::shared_ptr<std::vector<ServerID>> servers =
ci->getResponsibleServer(shard);
if (servers->size() > 0) {
serverMap[(*servers)[0]].push_back(shard);
serverMap[(*servers)[0]][collection->name()].push_back(shard);
}
}
}
@ -115,34 +117,38 @@ void Conductor::start(VPackSlice userConfig) {
_aggregatorUsage.reset(new AggregatorUsage(_agregatorCreator.get()));
int64_t vertexCount = 0, edgeCount = 0;
std::map<CollectionID, std::string> collectionPlanIdMap;
std::map<ServerID, std::vector<ShardID>> edgeServerMap;
std::map<ServerID, std::map<CollectionID, std::vector<ShardID>>> vertexMap, edgeMap;
// resolve plan id's and shards on the servers
for (auto &collection : _vertexCollections) {
collectionPlanIdMap[collection->name()] = collection->planId_as_string();
collectionPlanIdMap.emplace(collection->name(), collection->planId_as_string());
int64_t cc =
Utils::countDocuments(_vocbaseGuard.vocbase(), collection->name());
if (cc > 0) {
vertexCount += cc;
resolveShards(collection.get(), _vertexServerMap);
} else {
LOG(WARN) << "Collection does not contain vertices";
resolveShards(collection.get(), vertexMap);
}
}
edgeCount =
Utils::countDocuments(_vocbaseGuard.vocbase(), _edgeCollection->name());
if (edgeCount > 0) {
resolveShards(_edgeCollection.get(), edgeServerMap);
} else {
LOG(WARN) << "Collection does not contain edges";
for (auto &collection : _edgeCollections) {
collectionPlanIdMap.emplace(collection->name(), collection->planId_as_string());
int64_t cc =
Utils::countDocuments(_vocbaseGuard.vocbase(), collection->name());
if (cc > 0) {
edgeCount += cc;
resolveShards(collection.get(), edgeMap);
}
}
for (auto const& pair : vertexMap) {
_dbServers.push_back(pair.first);
}
std::string const baseUrl = Utils::baseUrl(_vocbaseGuard.vocbase()->name());
_globalSuperstep = 0;
_state = ExecutionState::RUNNING;
_dbServerCount = _vertexServerMap.size();
_dbServerCount = _dbServers.size();
_responseCount = 0;
_doneCount = 0;
if (_vertexServerMap.size() != edgeServerMap.size()) {
if (vertexMap.size() != edgeMap.size()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_BAD_PARAMETER,
"Vertex and edge collections are not sharded correctly");
@ -151,7 +157,11 @@ void Conductor::start(VPackSlice userConfig) {
std::string coordinatorId = ServerState::instance()->getId();
LOG(INFO) << "My id: " << coordinatorId;
std::vector<ClusterCommRequest> requests;
for (auto const& it : _vertexServerMap) {
for (auto const& it : vertexMap) {
ServerID const& server = it.first;
std::map<CollectionID, std::vector<ShardID>> const& vertexShardMap = it.second;
std::map<CollectionID, std::vector<ShardID>> const& edgeShardMap = edgeMap[it.first];
VPackBuilder b;
b.openObject();
b.add(Utils::executionNumberKey, VPackValue(_executionNumber));
@ -161,18 +171,24 @@ void Conductor::start(VPackSlice userConfig) {
b.add(Utils::coordinatorIdKey, VPackValue(coordinatorId));
b.add(Utils::totalVertexCount, VPackValue(vertexCount));
b.add(Utils::totalEdgeCount, VPackValue(edgeCount));
b.add(Utils::vertexShardsListKey, VPackValue(VPackValueType::Array));
for (ShardID const& vit : it.second) {
b.add(VPackValue(vit));
b.add(Utils::vertexShardsKey, VPackValue(VPackValueType::Object));
for (auto const& pair : vertexShardMap) {
b.add(pair.first, VPackValue(VPackValueType::Array));
for (ShardID const& shard : pair.second) {
b.add(VPackValue(shard));
}
b.close();
}
b.close();
b.add(Utils::edgeShardsListKey, VPackValue(VPackValueType::Array));
for (ShardID const& eit : edgeServerMap[it.first]) {
b.add(VPackValue(eit));
b.add(Utils::edgeShardsKey, VPackValue(VPackValueType::Object));
for (auto const& pair : edgeShardMap) {
b.add(pair.first, VPackValue(VPackValueType::Array));
for (ShardID const& shard : pair.second) {
b.add(VPackValue(shard));
}
b.close();
}
b.close();
b.add(Utils::edgeCollectionPlanIdKey,
VPackValue(_edgeCollection->planId_as_string()));
b.add(Utils::collectionPlanIdMapKey, VPackValue(VPackValueType::Object));
for (auto const& pair : collectionPlanIdMap) {
b.add(pair.first, VPackValue(pair.second));
@ -181,7 +197,7 @@ void Conductor::start(VPackSlice userConfig) {
b.close();
auto body = std::make_shared<std::string const>(b.toJson());
requests.emplace_back("server:" + it.first, rest::RequestType::POST,
requests.emplace_back("server:" + server, rest::RequestType::POST,
baseUrl + Utils::startExecutionPath, body);
}
@ -293,7 +309,7 @@ void Conductor::cancel() { _state = ExecutionState::CANCELED; }
int Conductor::sendToAllDBServers(std::string path, VPackSlice const& config) {
ClusterComm* cc = ClusterComm::instance();
_dbServerCount = _vertexServerMap.size();
_dbServerCount = _dbServers.size();
_responseCount = 0;
_doneCount = 0;
@ -304,8 +320,8 @@ int Conductor::sendToAllDBServers(std::string path, VPackSlice const& config) {
auto body = std::make_shared<std::string const>(config.toJson());
std::vector<ClusterCommRequest> requests;
for (auto const& it : _vertexServerMap) {
requests.emplace_back("server:" + it.first, rest::RequestType::POST, path,
for (auto const& server : _dbServers) {
requests.emplace_back("server:" + server, rest::RequestType::POST, path,
body);
}

View File

@ -46,12 +46,12 @@ class Conductor {
const std::string _algorithm;
ExecutionState _state;
std::vector<std::shared_ptr<LogicalCollection>> _vertexCollections;
std::shared_ptr<LogicalCollection> _edgeCollection;
std::vector<std::shared_ptr<LogicalCollection>> _edgeCollections;
std::vector<ServerID> _dbServers;
// initialized on startup
std::unique_ptr<IAggregatorCreator> _agregatorCreator;
std::unique_ptr<AggregatorUsage> _aggregatorUsage;
std::map<ServerID, std::vector<ShardID>> _vertexServerMap;
uint64_t _globalSuperstep = 0;
int32_t _dbServerCount = 0;
@ -69,7 +69,7 @@ class Conductor {
public:
Conductor(uint64_t executionNumber, TRI_vocbase_t* vocbase,
std::vector<std::shared_ptr<LogicalCollection>> const& vertexCollections,
std::shared_ptr<LogicalCollection> edgeCollection,
std::vector<std::shared_ptr<LogicalCollection>> const& edgeCollections,
std::string const& algorithm);
~Conductor();

View File

@ -28,7 +28,7 @@
#include "Indexes/Index.h"
#include "Utils.h"
#include "Utils/OperationCursor.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/ExplicitTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "Utils/Transaction.h"
#include "VocBase/EdgeCollectionInfo.h"
@ -41,16 +41,49 @@ using namespace arangodb;
using namespace arangodb::pregel;
template <typename V, typename E>
GraphStore<V, E>::GraphStore(TRI_vocbase_t* vocbase, const WorkerState* state,
GraphStore<V, E>::GraphStore(TRI_vocbase_t* vb, const WorkerState* state,
GraphFormat<V, E>* graphFormat)
: _vocbaseGuard(vocbase), _workerState(state), _graphFormat(graphFormat) {
_edgeCollection = ClusterInfo::instance()->getCollection(
vocbase->name(), state->edgeCollectionPlanId());
: _vocbaseGuard(vb), _workerState(state), _graphFormat(graphFormat) {
// _edgeCollection = ClusterInfo::instance()->getCollection(
// vb->name(), state->edgeCollectionPlanId());
std::vector<std::string> readColls, writeColls;
for (auto const& pair : state->vertexCollectionShards()) {
for (auto const& shard : pair.second) {
readColls.push_back(shard);
}
}
for (auto const& pair : state->edgeCollectionShards()) {
for (auto const& shard : pair.second) {
readColls.push_back(shard);
}
}
double lockTimeout =
(double)(TRI_TRANSACTION_DEFAULT_LOCK_TIMEOUT / 1000000ULL);
_transaction = new ExplicitTransaction(StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()),
readColls, writeColls,
lockTimeout, false, false);
int res = _transaction->begin();
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
for (auto& shard : state->localVertexShardIDs()) {
loadVertices(shard);
std::map<CollectionID, std::vector<ShardID>> const& vertexMap = state->vertexCollectionShards();
std::map<CollectionID, std::vector<ShardID>> const& edgeMap = state->edgeCollectionShards();
for (auto const& pair : vertexMap) {
std::vector<ShardID> const& vertexShards = pair.second;
for (size_t i = 0; i < vertexShards.size(); i++) {
// distributeshardslike should cause the edges for a vertex to be
// in the same shard index. x in vertexShard2 => E(x) in edgeShard2
for (auto const& pair2 : edgeMap) {
std::vector<ShardID> const& edgeShards = pair2.second;
TRI_ASSERT(vertexShards.size() == edgeShards.size());
loadVertices(vertexShards[i], edgeShards[i]);
}
}
}
cleanupTransactions();
LOG(INFO) << "Loaded " << _index.size() << "vertices and " << _edges.size() << " edges";
}
template <typename V, typename E>
@ -88,7 +121,7 @@ RangeIterator<EdgeEntry<E>> GraphStore<V, E>::edgeIterator(VertexEntry const* en
}
/*
template <typename V, typename E>
SingleCollectionTransaction* GraphStore<V, E>::readTransaction(ShardID const& shard) {
auto it = _transactions.find(shard);
@ -106,11 +139,11 @@ SingleCollectionTransaction* GraphStore<V, E>::readTransaction(ShardID const& sh
_transactions[shard] = trx.get();
return trx.release();
}
}
}*/
template <typename V, typename E>
void GraphStore<V, E>::cleanupTransactions() {
for (auto const& it : _transactions) { // clean transactions
/*for (auto const& it : _transactions) { // clean transactions
if (it.second->getStatus() == TRI_TRANSACTION_RUNNING) {
if (it.second->commit() != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "Pregel worker: Failed to commit on a read transaction";
@ -118,36 +151,35 @@ void GraphStore<V, E>::cleanupTransactions() {
}
delete (it.second);
}
_transactions.clear();
_transactions.clear();*/
if (_transaction) {
if (_transaction->getStatus() == TRI_TRANSACTION_RUNNING) {
if (_transaction->commit() != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "Pregel worker: Failed to commit on a read transaction";
}
}
delete _transaction;
_transaction = nullptr;
}
}
template <typename V, typename E>
void GraphStore<V, E>::loadVertices(ShardID const& vertexShard) {
void GraphStore<V, E>::loadVertices(ShardID const& vertexShard, ShardID const& edgeShard) {
//_graphFormat->willUseCollection(vocbase, vertexShard, false);
bool storeData = _graphFormat->storesVertexData();
SingleCollectionTransaction trx(
StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()),
vertexShard, TRI_TRANSACTION_READ);
TRI_voc_cid_t cid = _transaction->addCollectionAtRuntime(vertexShard);
_transaction->orderDitch(cid); // will throw when it fails
int res = trx.begin();
/*int res = _transaction->lockRead();
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'",
vertexShard.c_str());
}
}*/
TRI_voc_cid_t cid = trx.addCollectionAtRuntime(vertexShard);
trx.orderDitch(cid); // will throw when it fails
res = trx.lockRead();
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'",
vertexShard.c_str());
}
ManagedDocumentResult mmdr(&trx);
std::unique_ptr<OperationCursor> cursor = trx.indexScan(
ManagedDocumentResult mmdr(_transaction);
std::unique_ptr<OperationCursor> cursor = _transaction->indexScan(
vertexShard, Transaction::CursorType::ALL, Transaction::IndexHandle(), {},
&mmdr, 0, UINT64_MAX, 1000, false);
@ -164,7 +196,7 @@ void GraphStore<V, E>::loadVertices(ShardID const& vertexShard) {
cursor->getMoreMptr(result, 1000);
for (auto const& element : result) {
TRI_voc_rid_t revisionId = element.revisionId();
if (collection->readRevision(&trx, mmdr, revisionId)) {
if (collection->readRevision(_transaction, mmdr, revisionId)) {
VPackSlice document(mmdr.vpack());
if (document.isExternal()) {
document = document.resolveExternal();
@ -172,7 +204,7 @@ void GraphStore<V, E>::loadVertices(ShardID const& vertexShard) {
LOG(INFO) << "Loaded Vertex: " << document.toJson();
std::string vertexId = trx.extractIdString(document);
std::string vertexId = _transaction->extractIdString(document);
VertexEntry entry(vertexId);
if (storeData) {
V vertexData;
@ -185,42 +217,44 @@ void GraphStore<V, E>::loadVertices(ShardID const& vertexShard) {
LOG(ERR) << "Could not load vertex " << document.toJson();
}
}
loadEdges(entry);
loadEdges(edgeShard, entry);
_index.push_back(entry);
}
}
}
res = trx.unlockRead();
/*res = trx.unlockRead();
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up shard '%s'",
vertexShard.c_str());
}
//_shardsPlanIdMap[vertexShard] =
// trx.documentCollection()->planId_as_string();
res = trx.finish(res);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'",
vertexShard.c_str());
}
}//*/
}
template <typename V, typename E>
void GraphStore<V, E>::loadEdges(VertexEntry& vertexEntry) {
void GraphStore<V, E>::loadEdges(ShardID const& shard, VertexEntry& vertexEntry) {
//_graphFormat->willUseCollection(vocbase, edgeShard, true);
const bool storeData = _graphFormat->storesEdgeData();
std::string const& _from = vertexEntry.vertexID();
const std::string _key = Utils::vertexKeyFromToValue(_from);
ShardID shard;
Utils::resolveShard(_edgeCollection.get(), Utils::edgeShardingKey, _key,
shard);
SingleCollectionTransaction* trx = readTransaction(shard);
traverser::EdgeCollectionInfo info(trx, shard, TRI_EDGE_OUT,
/*ShardID shard;
Utils::resolveShard(_edgeCollection.get(), Utils::edgeShardingKey,
_key, shard);*/
//Transaction* trx = readTransaction(shard);
traverser::EdgeCollectionInfo info(_transaction, shard, TRI_EDGE_OUT,
StaticStrings::FromString, 0);
ManagedDocumentResult mmdr(trx);
ManagedDocumentResult mmdr(_transaction);
auto cursor = info.getEdges(_from, &mmdr);
if (cursor->failed()) {
THROW_ARANGO_EXCEPTION_FORMAT(cursor->code,
@ -240,7 +274,7 @@ void GraphStore<V, E>::loadEdges(VertexEntry& vertexEntry) {
cursor->getMoreMptr(result, 1000);
for (auto const& element : result) {
TRI_voc_rid_t revisionId = element.revisionId();
if (collection->readRevision(trx, mmdr, revisionId)) {
if (collection->readRevision(_transaction, mmdr, revisionId)) {
VPackSlice document(mmdr.vpack());
if (document.isExternal()) {
document = document.resolveExternal();
@ -276,7 +310,7 @@ void GraphStore<V, E>::loadEdges(VertexEntry& vertexEntry) {
}*/
}
template <typename V, typename E>
/*template <typename V, typename E>
SingleCollectionTransaction* GraphStore<V, E>::writeTransaction(ShardID const& shard) {
auto it = _transactions.find(shard);
@ -295,11 +329,28 @@ SingleCollectionTransaction* GraphStore<V, E>::writeTransaction(ShardID const& s
_transactions[shard] = trx.get();
return trx.release();
}
}
}*/
template <typename V, typename E>
void GraphStore<V,E>::storeResults() {
std::vector<std::string> readColls, writeColls;
for (auto shard : _workerState->localVertexShardIDs() ) {
writeColls.push_back(shard);
}
//for (auto shard : _workerState->localEdgeShardIDs() ) {
// writeColls(shard);
//}
double lockTimeout =
(double)(TRI_TRANSACTION_DEFAULT_LOCK_TIMEOUT / 1000000ULL);
_transaction = new ExplicitTransaction(StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()),
readColls, writeColls,
lockTimeout, false, false);
int res = _transaction->begin();
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(res);
}
OperationOptions options;
for (auto& vertexEntry : _index) {
@ -316,7 +367,6 @@ void GraphStore<V,E>::storeResults() {
std::string shard;
Utils::resolveShard(collInfo.get(), StaticStrings::KeyString, _key, shard);
SingleCollectionTransaction* trx = writeTransaction(shard);
void* data = mutableVertexData(&vertexEntry);
VPackBuilder b;
@ -325,7 +375,7 @@ void GraphStore<V,E>::storeResults() {
_graphFormat->buildVertexDocument(b, data, sizeof(V));
b.close();
OperationResult result = trx->update(shard, b.slice(), options);
OperationResult result = _transaction->update(shard, b.slice(), options);
if (result.code != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION(result.code);
}

View File

@ -26,12 +26,13 @@
#include <cstdint>
#include <cstdio>
#include "Cluster/ClusterInfo.h"
#include "VocBase/voc-types.h"
#include "GraphFormat.h"
struct TRI_vocbase_t;
namespace arangodb {
class SingleCollectionTransaction;
class Transaction;
class LogicalCollection;
namespace pregel {
@ -43,6 +44,8 @@ struct EdgeEntry {
// size_t _nextEntryOffset;
// size_t _dataSize;
std::string _toVertexID;
TRI_voc_cid_t sourceShardID;
TRI_voc_cid_t targetShardID;
E _data;
// size_t _vertexIDSize;
// char _vertexID[1];
@ -246,15 +249,14 @@ class GraphStore {
VocbaseGuard _vocbaseGuard;
const WorkerState* _workerState;
const std::unique_ptr<GraphFormat<V, E>> _graphFormat;
std::unordered_map<std::string, SingleCollectionTransaction*> _transactions;
std::shared_ptr<LogicalCollection> _edgeCollection;
SingleCollectionTransaction* readTransaction(ShardID const& shard);
SingleCollectionTransaction* writeTransaction(ShardID const& shard);
Transaction *_transaction;
//SingleCollectionTransaction* readTransaction(ShardID const& shard);
//SingleCollectionTransaction* writeTransaction(ShardID const& shard);
void cleanupTransactions();
void loadVertices(ShardID const& vertexShard);
void loadEdges(VertexEntry& entry);
void loadVertices(ShardID const& vertexShard, ShardID const& edgeShard);
void loadEdges(ShardID const& edgeShard, VertexEntry& entry);
public:
GraphStore(TRI_vocbase_t* vocbase, const WorkerState* state,

View File

@ -44,7 +44,7 @@ template <typename M>
class IncomingCache {
public:
IncomingCache(MessageFormat<M> const* format, MessageCombiner<M> const* combiner)
: _format(format), _combiner(combiner), _receivedMessageCount(0) {}
: _receivedMessageCount(0), _format(format), _combiner(combiner) {}
~IncomingCache();
void parseMessages(VPackSlice messages);

View File

@ -27,10 +27,7 @@ In arangosh:
db._create('vertices', {numberOfShards: 2});
db._createEdgeCollection('alt_edges');
db._createEdgeCollection('edges', {numberOfShards: 2,
shardKeys:["_vertex"],
distributeShardsLike:'vertices'
});
db._createEdgeCollection('edges', {numberOfShards: 2, shardKeys:["_vertex"], distributeShardsLike:'vertices'});
arangoimp --file generated_vertices.csv --type csv --collection vertices --overwrite true --server.endpoint http+tcp://127.0.0.1:8530

View File

@ -45,9 +45,8 @@ std::string const Utils::finalizeExecutionPath = "finalizeExecution";
std::string const Utils::executionNumberKey = "exn";
std::string const Utils::collectionPlanIdMapKey = "collectionPlanIdMap";
std::string const Utils::edgeCollectionPlanIdKey = "edgePlanId";
std::string const Utils::vertexShardsListKey = "vertexShards";
std::string const Utils::edgeShardsListKey = "edgeShards";
std::string const Utils::vertexShardsKey = "vertexShards";
std::string const Utils::edgeShardsKey = "edgeShards";
std::string const Utils::coordinatorIdKey = "coordinatorId";
std::string const Utils::algorithmKey = "algorithm";

View File

@ -51,9 +51,8 @@ class Utils {
static std::string const algorithmKey;
static std::string const coordinatorIdKey;
static std::string const collectionPlanIdMapKey;
static std::string const edgeCollectionPlanIdKey;
static std::string const vertexShardsListKey;
static std::string const edgeShardsListKey;
static std::string const vertexShardsKey;
static std::string const edgeShardsKey;
static std::string const globalSuperstepKey;
static std::string const messagesKey;

View File

@ -31,14 +31,13 @@ using namespace arangodb::pregel;
WorkerState::WorkerState(DatabaseID dbname, VPackSlice params)
: _database(dbname) {
VPackSlice coordID = params.get(Utils::coordinatorIdKey);
VPackSlice vertexShardIDs = params.get(Utils::vertexShardsListKey);
VPackSlice edgeShardIDs = params.get(Utils::edgeShardsListKey);
VPackSlice vertexShardMap = params.get(Utils::vertexShardsKey);
VPackSlice edgeShardMap = params.get(Utils::edgeShardsKey);
VPackSlice execNum = params.get(Utils::executionNumberKey);
VPackSlice collectionPlanIdMap = params.get(Utils::collectionPlanIdMapKey);
VPackSlice edgePlanID = params.get(Utils::edgeCollectionPlanIdKey);
if (!coordID.isString() || !vertexShardIDs.isArray() ||
!edgeShardIDs.isArray() || !execNum.isInteger() ||
!collectionPlanIdMap.isObject() || !edgePlanID.isString()) {
if (!coordID.isString() || !edgeShardMap.isObject() ||
!vertexShardMap.isObject() || !execNum.isInteger() ||
!collectionPlanIdMap.isObject()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"Supplied bad parameters to worker");
}
@ -47,22 +46,25 @@ WorkerState::WorkerState(DatabaseID dbname, VPackSlice params)
//_vertexCollectionName = vertexCollName.copyString();
//_vertexCollectionPlanId = vertexCollPlanId.copyString();
LOG(INFO) << "Local Shards:";
VPackArrayIterator vertices(vertexShardIDs);
for (VPackSlice shardSlice : vertices) {
ShardID name = shardSlice.copyString();
_localVertexShardIDs.push_back(name);
LOG(INFO) << name;
for (auto const& pair : VPackObjectIterator(vertexShardMap)) {
std::vector<ShardID> shards;
for (VPackSlice shardSlice : VPackArrayIterator(pair.value)) {
ShardID shard = shardSlice.copyString();
shards.push_back(shard);
_localVertexShardIDs.push_back(shard);
}
_vertexCollectionShards.emplace(pair.key.copyString(), shards);
}
VPackArrayIterator edges(edgeShardIDs);
for (VPackSlice shardSlice : edges) {
ShardID name = shardSlice.copyString();
_localEdgeShardIDs.push_back(name);
LOG(INFO) << name;
for (auto const& pair : VPackObjectIterator(edgeShardMap)) {
std::vector<ShardID> shards;
for (VPackSlice shardSlice : VPackArrayIterator(pair.value)) {
shards.push_back(shardSlice.copyString());
}
_edgeCollectionShards.emplace(pair.key.copyString(), shards);
}
for (auto const& it : VPackObjectIterator(collectionPlanIdMap)) {
_collectionPlanIdMap.emplace(it.key.copyString(), it.value.copyString());
}
_edgeCollectionPlanId = edgePlanID.copyString();
}

View File

@ -52,21 +52,21 @@ class WorkerState {
inline std::string const& database() const { return _database; }
inline std::vector<ShardID> const& localVertexShardIDs() const {
return _localVertexShardIDs;
inline std::map<CollectionID, std::vector<ShardID>> const& vertexCollectionShards() const {
return _vertexCollectionShards;
}
inline std::vector<ShardID> const& localEdgeShardIDs() const {
return _localEdgeShardIDs;
inline std::map<CollectionID, std::vector<ShardID>> const& edgeCollectionShards() const {
return _edgeCollectionShards;
}
std::map<CollectionID, std::string> const& collectionPlanIdMap() const {
inline std::map<CollectionID, std::string> const& collectionPlanIdMap() const {
return _collectionPlanIdMap;
};
std::string const& edgeCollectionPlanId() const {
return _edgeCollectionPlanId;
}
inline std::vector<ShardID> const& localVertexShardIDs() const {
return _localVertexShardIDs;
};
// inline uint64_t numWorkerThreads() {
// return _numWorkerThreads;
@ -79,9 +79,10 @@ class WorkerState {
std::string _coordinatorId;
const std::string _database;
std::vector<ShardID> _localVertexShardIDs, _localEdgeShardIDs;
std::vector<ShardID> _localVertexShardIDs;
std::map<CollectionID, std::vector<ShardID>> _vertexCollectionShards, _edgeCollectionShards;
std::map<std::string, std::string> _collectionPlanIdMap;
std::string _edgeCollectionPlanId;
};
}
}

View File

@ -27,7 +27,7 @@
#include "Basics/Common.h"
#include "Utils/Transaction.h"
#include "Utils/V8TransactionContext.h"
#include "Utils/TransactionContext.h"
#include "VocBase/ticks.h"
#include "VocBase/transaction.h"
@ -39,7 +39,7 @@ class ExplicitTransaction : public Transaction {
/// @brief create the transaction
//////////////////////////////////////////////////////////////////////////////
ExplicitTransaction(std::shared_ptr<V8TransactionContext> transactionContext,
ExplicitTransaction(std::shared_ptr<TransactionContext> transactionContext,
std::vector<std::string> const& readCollections,
std::vector<std::string> const& writeCollections,
double lockTimeout, bool waitForSync,

View File

@ -1848,16 +1848,20 @@ static void JS_PregelStart(v8::FunctionCallbackInfo<v8::Value> const& args) {
"_pregelStart(<vertexCollection>, <edgeCollection>, <algorithm>[, "
"{steps:100, ...}]");
}
std::vector<std::string> vertices;
if (args[0]->IsArray()) {
v8::Handle<v8::Array> array = v8::Handle<v8::Array>::Cast(args[0]);
auto parse = [](v8::Local<v8::Value> const& value, std::vector<std::string> &out) {
v8::Handle<v8::Array> array = v8::Handle<v8::Array>::Cast(value);
uint32_t const n = array->Length();
for (uint32_t i = 0; i < n; ++i) {
v8::Handle<v8::Value> obj = array->Get(i);
if (obj->IsString()) {
vertices.push_back(TRI_ObjectToString(obj));
out.push_back(TRI_ObjectToString(obj));
}
}
};
std::vector<std::string> vertices, edges;
if (args[0]->IsArray()) {
parse(args[0], vertices);
} else if (args[0]->IsString()) {
vertices.push_back(TRI_ObjectToString(args[0]));
} else {
@ -1866,10 +1870,16 @@ static void JS_PregelStart(v8::FunctionCallbackInfo<v8::Value> const& args) {
if (vertices.size() == 0) {
TRI_V8_THROW_EXCEPTION_USAGE("Specify at least one vertex collection");
}
if (!args[1]->IsString()) {
TRI_V8_THROW_EXCEPTION_USAGE("Specify an edge collection to use");
if (args[1]->IsArray()) {
parse(args[1], edges);
} else if (args[1]->IsString()) {
edges.push_back(TRI_ObjectToString(args[1]));
} else {
TRI_V8_THROW_EXCEPTION_USAGE("Specify an array of edge collections (or a string)");
}
if (edges.size() == 0) {
TRI_V8_THROW_EXCEPTION_USAGE("Specify at least one edge collection");
}
std::string edgeCName(TRI_ObjectToString(args[1]));
std::string algorithm = TRI_ObjectToString(args[2]);
VPackBuilder paramBuilder;
if (argLength >= 4 && args[3]->IsObject()) {
@ -1879,14 +1889,11 @@ static void JS_PregelStart(v8::FunctionCallbackInfo<v8::Value> const& args) {
}
}
LOG(INFO) << "Called _pregelStart(" << vertices[0] << "," << edgeCName << ")";
if (ServerState::instance()->isCoordinator()) {
LOG(INFO) << "Called as a controller";
TRI_vocbase_t* vocbase = GetContextVocBase(isolate);
std::vector<std::shared_ptr<LogicalCollection>> vColls;
std::shared_ptr<LogicalCollection> edgeCollection;
std::vector<std::shared_ptr<LogicalCollection>> vColls, eColls;
try {
for (std::string const& name : vertices) {
auto coll =
@ -1901,17 +1908,20 @@ static void JS_PregelStart(v8::FunctionCallbackInfo<v8::Value> const& args) {
}
vColls.push_back(coll);
}
edgeCollection =
ClusterInfo::instance()->getCollection(vocbase->name(), edgeCName);
if (edgeCollection->isSystem()) {
TRI_V8_THROW_EXCEPTION_USAGE(
"Cannot use pregel on system collection");
}
std::vector<std::string> eKeys = edgeCollection->shardKeys();
if (eKeys.size() != 1 || eKeys[0] != "_vertex") {
for (std::string const& name : edges) {
auto coll =
ClusterInfo::instance()->getCollection(vocbase->name(), name);
if (coll->isSystem()) {
TRI_V8_THROW_EXCEPTION_USAGE(
"Cannot use pregel on system collection");
}
std::vector<std::string> eKeys = coll->shardKeys();
if (eKeys.size() != 1 || eKeys[0] != "_vertex") {
TRI_V8_THROW_EXCEPTION_USAGE(
"Edge collection needs to be sharded after '_vertex', or use "
"smart graphs");
}
eColls.push_back(coll);
}
} catch (...) {
TRI_V8_THROW_EXCEPTION_USAGE("Collections do not exist");
@ -1921,7 +1931,7 @@ static void JS_PregelStart(v8::FunctionCallbackInfo<v8::Value> const& args) {
pregel::Conductor* c = new pregel::Conductor(en,
vocbase,
vColls,
edgeCollection,
eColls,
algorithm);
pregel::PregelFeature::instance()->addExecution(c, en);
c->start(paramBuilder.slice());