1
0
Fork 0

Better GraphStore

This commit is contained in:
Simon Grätzer 2016-11-12 21:55:36 +01:00
parent 65b07d7f15
commit 7e435b2dfa
19 changed files with 217 additions and 311 deletions

View File

@ -347,7 +347,7 @@ SET(ARANGOD_SOURCES
VocBase/transaction.cpp VocBase/transaction.cpp
VocBase/vocbase.cpp VocBase/vocbase.cpp
Wal/AllocatorThread.cpp Wal/AllocatorThread.cpp
Wal/CollectorThread.cpp Wal/CollectorThread.cppb
Wal/DocumentOperation.cpp Wal/DocumentOperation.cpp
Wal/Logfile.cpp Wal/Logfile.cpp
Wal/LogfileManager.cpp Wal/LogfileManager.cpp
@ -365,7 +365,6 @@ SET(ARANGOD_SOURCES
Pregel/WorkerState.cpp Pregel/WorkerState.cpp
Pregel/GraphStore.cpp Pregel/GraphStore.cpp
Pregel/Utils.cpp Pregel/Utils.cpp
Pregel/ResultWriter.cpp
Pregel/Algos/SSSP.cpp Pregel/Algos/SSSP.cpp
Pregel/Algos/PageRank.cpp Pregel/Algos/PageRank.cpp
${ADDITIONAL_BIN_ARANGOD_SOURCES} ${ADDITIONAL_BIN_ARANGOD_SOURCES}

View File

@ -64,17 +64,12 @@ Conductor::Conductor(
} }
} }
Conductor::~Conductor() { Conductor::~Conductor() {}
/*for (auto const &it : _aggregators) {
delete(it.second);
}
_aggregators.clear();*/
}
static void printResults(std::vector<ClusterCommRequest> const& requests) { static void printResults(std::vector<ClusterCommRequest> const& requests) {
for (auto const& req : requests) { for (auto const& req : requests) {
auto& res = req.result; auto& res = req.result;
if (res.status == CL_COMM_RECEIVED) { if (res.status == CL_COMM_RECEIVED && res.answer_code != rest::ResponseCode::OK) {
LOG(INFO) << res.answer->payload().toJson(); LOG(INFO) << res.answer->payload().toJson();
} }
} }
@ -123,7 +118,6 @@ void Conductor::start(VPackSlice userConfig) {
LOG(WARN) << "Collection does not contain vertices"; LOG(WARN) << "Collection does not contain vertices";
} }
} }
collectionPlanIdMap[_edgeCollection->name()] = _edgeCollection->planId_as_string();
edgeCount = Utils::countDocuments(_vocbaseGuard.vocbase(), _edgeCollection->name()); edgeCount = Utils::countDocuments(_vocbaseGuard.vocbase(), _edgeCollection->name());
if (edgeCount > 0) { if (edgeCount > 0) {
resolveShards(_edgeCollection.get(), edgeServerMap); resolveShards(_edgeCollection.get(), edgeServerMap);
@ -166,6 +160,8 @@ void Conductor::start(VPackSlice userConfig) {
b.add(VPackValue(eit)); b.add(VPackValue(eit));
} }
b.close(); b.close();
b.add(Utils::edgeCollectionPlanIdKey,
VPackValue(_edgeCollection->planId_as_string()));
b.add(Utils::collectionPlanIdMapKey, VPackValue(VPackValueType::Object)); b.add(Utils::collectionPlanIdMapKey, VPackValue(VPackValueType::Object));
for (auto const& pair : collectionPlanIdMap) { for (auto const& pair : collectionPlanIdMap) {
b.add(pair.first, VPackValue(pair.second)); b.add(pair.first, VPackValue(pair.second));
@ -258,14 +254,14 @@ void Conductor::finishedGlobalStep(VPackSlice& data) {
b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep)); b.add(Utils::globalSuperstepKey, VPackValue(_globalSuperstep));
b.close(); b.close();
sendToAllDBServers(baseUrl + Utils::finalizeExecutionPath, b.slice()); sendToAllDBServers(baseUrl + Utils::finalizeExecutionPath, b.slice());
_state = ExecutionState::FINISHED; _state = ExecutionState::DONE;
} else { // trigger next superstep } else { // trigger next superstep
startGlobalStep(); startGlobalStep();
} }
} }
} }
void Conductor::cancel() { _state = ExecutionState::ERROR; } void Conductor::cancel() { _state = ExecutionState::CANCELED; }
int Conductor::sendToAllDBServers(std::string path, VPackSlice const& config) { int Conductor::sendToAllDBServers(std::string path, VPackSlice const& config) {
ClusterComm* cc = ClusterComm::instance(); ClusterComm* cc = ClusterComm::instance();

View File

@ -37,7 +37,7 @@ namespace pregel {
class Conductor { class Conductor {
public: public:
enum ExecutionState { RUNNING, FINISHED, ERROR }; enum ExecutionState { RUNNING, DONE, CANCELED};
Conductor(uint64_t executionNumber, TRI_vocbase_t* vocbase, Conductor(uint64_t executionNumber, TRI_vocbase_t* vocbase,
std::vector<std::shared_ptr<LogicalCollection>> vertexCollections, std::vector<std::shared_ptr<LogicalCollection>> vertexCollections,

View File

@ -35,6 +35,8 @@ namespace pregel {
template <typename V, typename E> template <typename V, typename E>
struct GraphFormat { struct GraphFormat {
virtual bool storesVertexData() const { return true; }
virtual bool storesEdgeData() const { return true; }
virtual size_t estimatedVertexSize() const { return sizeof(V); }; virtual size_t estimatedVertexSize() const { return sizeof(V); };
virtual size_t estimatedEdgeSize() const { return sizeof(E); }; virtual size_t estimatedEdgeSize() const { return sizeof(E); };
@ -42,11 +44,10 @@ struct GraphFormat {
size_t maxSize) = 0; size_t maxSize) = 0;
virtual size_t copyEdgeData(arangodb::velocypack::Slice edgeDocument, void* targetPtr, virtual size_t copyEdgeData(arangodb::velocypack::Slice edgeDocument, void* targetPtr,
size_t maxSize) = 0; size_t maxSize) = 0;
virtual V readVertexData(void* ptr) = 0; virtual V readVertexData(void* ptr) = 0;
virtual E readEdgeData(void* ptr) = 0; virtual E readEdgeData(void* ptr) = 0;
virtual bool storesVertexData() const { return true; }
virtual bool storesEdgeData() const { return true; }
}; };
class IntegerGraphFormat : public GraphFormat<int64_t, int64_t> { class IntegerGraphFormat : public GraphFormat<int64_t, int64_t> {

View File

@ -24,7 +24,6 @@
#include "Basics/Exceptions.h" #include "Basics/Exceptions.h"
#include "Cluster/ClusterInfo.h" #include "Cluster/ClusterInfo.h"
#include "Cluster/ClusterInfo.h"
#include "Indexes/EdgeIndex.h" #include "Indexes/EdgeIndex.h"
#include "Indexes/Index.h" #include "Indexes/Index.h"
#include "Utils/OperationCursor.h" #include "Utils/OperationCursor.h"
@ -35,26 +34,33 @@
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/ticks.h" #include "VocBase/ticks.h"
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"
#include "WorkerState.h"
#include "Utils.h"
using namespace arangodb; using namespace arangodb;
using namespace arangodb::pregel; using namespace arangodb::pregel;
template <typename V, typename E> template <typename V, typename E>
GraphStore<V, E>::GraphStore( GraphStore<V, E>::GraphStore(
std::vector<ShardID> const& vertexShards, TRI_vocbase_t* vocbase,
std::vector<ShardID> const& edgeShards, TRI_vocbase_t* vocbase, const WorkerState* state,
GraphFormat<V, E> *graphFormat) GraphFormat<V, E> *graphFormat)
: _graphFormat(graphFormat) { : _vocbaseGuard(vocbase), _workerState(state), _graphFormat(graphFormat) {
for (auto& shard : vertexShards) {
lookupVertices(shard, vocbase);
} _edgeCollection = ClusterInfo::instance()->getCollection(vocbase->name(),
for (auto& shard : edgeShards) { state->edgeCollectionPlanId());
lookupEdges(shard, vocbase);
for (auto& shard : state->localVertexShardIDs()) {
loadVertices(shard);
} }
cleanupTransactions();
} }
template <typename V, typename E> template <typename V, typename E>
GraphStore<V, E>::~GraphStore() {} GraphStore<V, E>::~GraphStore() {
cleanupTransactions();
}
template <typename V, typename E> template <typename V, typename E>
std::vector<VertexEntry>& GraphStore<V, E>::vertexIterator() { std::vector<VertexEntry>& GraphStore<V, E>::vertexIterator() {
@ -87,25 +93,11 @@ EdgeIterator<E> GraphStore<V, E>::edgeIterator(VertexEntry const* entry) {
} }
template <typename V, typename E> template <typename V, typename E>
void GraphStore<V, E>::cleanupReadTransactions() { void GraphStore<V, E>::loadVertices(ShardID const& vertexShard) {
/*for (auto const& it : _readTrxList) { // clean transactions
if (it->getStatus() == TRI_TRANSACTION_RUNNING) {
if (it->commit() != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "Pregel worker: Failed to commit on a read transaction";
}
}
delete (it);
}
_readTrxList.clear();*/
}
template <typename V, typename E>
void GraphStore<V, E>::lookupVertices(ShardID const& vertexShard,
TRI_vocbase_t* vocbase) {
//_graphFormat->willUseCollection(vocbase, vertexShard, false); //_graphFormat->willUseCollection(vocbase, vertexShard, false);
bool storeData = _graphFormat->storesVertexData(); bool storeData = _graphFormat->storesVertexData();
SingleCollectionTransaction trx(StandaloneTransactionContext::Create(vocbase), SingleCollectionTransaction trx(StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()),
vertexShard, TRI_TRANSACTION_READ); vertexShard, TRI_TRANSACTION_READ);
int res = trx.begin(); int res = trx.begin();
@ -162,6 +154,7 @@ void GraphStore<V, E>::lookupVertices(ShardID const& vertexShard,
LOG(ERR) << "Could not load vertex " << document.toJson(); LOG(ERR) << "Could not load vertex " << document.toJson();
} }
} }
loadEdges(entry);
_index.push_back(entry); _index.push_back(entry);
} }
} }
@ -173,7 +166,7 @@ void GraphStore<V, E>::lookupVertices(ShardID const& vertexShard,
vertexShard.c_str()); vertexShard.c_str());
} }
_shardsPlanIdMap[vertexShard] = trx.documentCollection()->planId_as_string(); //_shardsPlanIdMap[vertexShard] = trx.documentCollection()->planId_as_string();
res = trx.finish(res); res = trx.finish(res);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'", THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up vertices '%s'",
@ -184,86 +177,107 @@ void GraphStore<V, E>::lookupVertices(ShardID const& vertexShard,
} }
template <typename V, typename E> template <typename V, typename E>
void GraphStore<V, E>::lookupEdges(ShardID const& edgeShard, SingleCollectionTransaction* GraphStore<V, E>::edgeTransaction(ShardID const& shard) {
TRI_vocbase_t* vocbase) { auto it = _transactions.find(shard);
if (it != _transactions.end()) {
return it->second;
} else {
auto trx =
std::make_unique<SingleCollectionTransaction>(StandaloneTransactionContext::Create(_vocbaseGuard.vocbase()),
shard,
TRI_TRANSACTION_READ);
int res = trx->begin();
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_FORMAT(res, "during transaction of shard '%s'", shard.c_str());
}
_transactions[shard] = trx.get();
return trx.release();
}
}
template <typename V, typename E>
void GraphStore<V, E>::cleanupTransactions() {
for (auto const& it : _transactions) { // clean transactions
if (it.second->getStatus() == TRI_TRANSACTION_RUNNING) {
if (it.second->commit() != TRI_ERROR_NO_ERROR) {
LOG(WARN) << "Pregel worker: Failed to commit on a read transaction";
}
}
delete (it.second);
}
_transactions.clear();
}
template <typename V, typename E>
void GraphStore<V, E>::loadEdges(VertexEntry &vertexEntry) {
//_graphFormat->willUseCollection(vocbase, edgeShard, true); //_graphFormat->willUseCollection(vocbase, edgeShard, true);
const bool storeData = _graphFormat->storesEdgeData(); const bool storeData = _graphFormat->storesEdgeData();
std::string const& _from = vertexEntry.vertexID();
const std::string _key = Utils::vertexKeyFromToValue(_from);
ShardID shard;
Utils::resolveShard(_edgeCollection.get(), Utils::edgeShardingKey,
_key, shard);
SingleCollectionTransaction *trx = edgeTransaction(shard);
traverser::EdgeCollectionInfo info(trx, shard, TRI_EDGE_OUT,
StaticStrings::FromString, 0);
SingleCollectionTransaction trx(StandaloneTransactionContext::Create(vocbase),
edgeShard, TRI_TRANSACTION_READ); ManagedDocumentResult mmdr(trx);
auto cursor = info.getEdges(_from, &mmdr);
int res = trx.begin(); if (cursor->failed()) {
if (res != TRI_ERROR_NO_ERROR) { THROW_ARANGO_EXCEPTION_FORMAT(cursor->code,
THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up edges '%s'", "while looking up edges '%s' from %s",
edgeShard.c_str()); _from.c_str(), shard.c_str());
} }
auto info = std::make_unique<arangodb::traverser::EdgeCollectionInfo>( LogicalCollection* collection = cursor->collection();
&trx, edgeShard, TRI_EDGE_OUT, StaticStrings::FromString, 0); std::vector<IndexLookupResult> result;
result.reserve(1000);
for (auto& vertexEntry : _index) { while (cursor->hasMore()) {
if (vertexEntry._edgeCount != 0) { if (vertexEntry._edgeCount == 0) {
continue; vertexEntry._edgeDataOffset = _edges.size();
} }
std::string const& _from = vertexEntry.vertexID(); cursor->getMoreMptr(result, 1000);
ManagedDocumentResult mmdr(&trx); for (auto const& element : result) {
auto cursor = info->getEdges(_from, &mmdr); TRI_voc_rid_t revisionId = element.revisionId();
if (cursor->failed()) { if (collection->readRevision(trx, mmdr, revisionId)) {
THROW_ARANGO_EXCEPTION_FORMAT(cursor->code, VPackSlice document(mmdr.vpack());
"while looking up edges '%s' from %s", if (document.isExternal()) {
_from.c_str(), edgeShard.c_str()); document = document.resolveExternal();
}
LogicalCollection* collection = cursor->collection();
std::vector<IndexLookupResult> result;
result.reserve(1000);
while (cursor->hasMore()) {
if (vertexEntry._edgeCount == 0) {
vertexEntry._edgeDataOffset = _edges.size();
}
cursor->getMoreMptr(result, 1000);
for (auto const& element : result) {
TRI_voc_rid_t revisionId = element.revisionId();
if (collection->readRevision(&trx, mmdr, revisionId)) {
VPackSlice document(mmdr.vpack());
if (document.isExternal()) {
document = document.resolveExternal();
}
LOG(INFO) << "Loaded Edge: " << document.toJson();
std::string toVertexID =
document.get(StaticStrings::ToString).copyString();
vertexEntry._edgeCount += 1;
if (storeData) {
E edgeData;
size_t size =
_graphFormat->copyEdgeData(document, &edgeData, sizeof(E));
if (size > 0) {
_edges.emplace_back(toVertexID, edgeData);
} else {
LOG(ERR) << "Trouble when reading data for edge "
<< document.toJson();
}
} else {
_edges.emplace_back(toVertexID);
}
} }
// ====== actual loading ======
LOG(INFO) << "Loaded Edge: " << document.toJson();
std::string toVertexID =
document.get(StaticStrings::ToString).copyString();
vertexEntry._edgeCount += 1;
if (storeData) {
E edgeData;
size_t size =
_graphFormat->copyEdgeData(document, &edgeData, sizeof(E));
if (size > 0) {
_edges.emplace_back(toVertexID, edgeData);
} else {
LOG(ERR) << "Trouble when reading data for edge "
<< document.toJson();
}
} else {
_edges.emplace_back(toVertexID);
}
} }
} }
} }
res = trx.finish(res); /*res = trx.finish(res);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up edges from '%s'", THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up edges from '%s'",
edgeShard.c_str()); edgeShard.c_str());
} }*/
//_readTrxList.push_back(trx.get());
// trx.release();
} }
template class arangodb::pregel::GraphStore<int64_t, int64_t>; template class arangodb::pregel::GraphStore<int64_t, int64_t>;

View File

@ -29,7 +29,10 @@
#include "GraphFormat.h" #include "GraphFormat.h"
struct TRI_vocbase_t; struct TRI_vocbase_t;
namespace arangodb { namespace arangodb {
class SingleCollectionTransaction;
class LogicalCollection;
namespace pregel { namespace pregel {
/// @brief header entry for the edge file /// @brief header entry for the edge file
@ -224,7 +227,9 @@ class VertexIterator {
return _current != other._current; return _current != other._current;
} }
}; };
class WorkerState;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief carry graph data for a worker job /// @brief carry graph data for a worker job
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -234,24 +239,30 @@ class GraphStore {
// int _indexFd, _vertexFd, _edgeFd; // int _indexFd, _vertexFd, _edgeFd;
// void *_indexMapping, *_vertexMapping, *_edgeMapping; // void *_indexMapping, *_vertexMapping, *_edgeMapping;
// size_t _indexSize, _vertexSize, _edgeSize; // size_t _indexSize, _vertexSize, _edgeSize;
std::map<std::string, std::string> _shardsPlanIdMap; //std::map<std::string, std::string> _shardsPlanIdMap;
void lookupVertices(ShardID const& vertexShard, TRI_vocbase_t* vocbase);
void lookupEdges(ShardID const& edgeShardID, TRI_vocbase_t* vocbase);
void cleanupReadTransactions();
// only for demo, move to memory // only for demo, move to memory
std::vector<VertexEntry> _index; std::vector<VertexEntry> _index;
std::vector<V> _vertexData; std::vector<V> _vertexData;
std::vector<EdgeEntry<E>> _edges; std::vector<EdgeEntry<E>> _edges;
const std::unique_ptr<GraphFormat<V, E>> _graphFormat;
size_t _localVerticeCount; size_t _localVerticeCount;
size_t _localEdgeCount; size_t _localEdgeCount;
VocbaseGuard _vocbaseGuard;
const WorkerState *_workerState;
const std::unique_ptr<GraphFormat<V, E>> _graphFormat;
std::unordered_map<std::string, SingleCollectionTransaction*> _transactions;
std::shared_ptr<LogicalCollection> _edgeCollection;
void loadVertices(ShardID const& vertexShard);
SingleCollectionTransaction* edgeTransaction(ShardID const& shard);
void loadEdges(VertexEntry &entry);
void cleanupTransactions();
public: public:
GraphStore(std::vector<ShardID> const& vertexShards, GraphStore(TRI_vocbase_t* vocbase,
std::vector<ShardID> const& edgeShards, TRI_vocbase_t* vocbase, const WorkerState* state,
GraphFormat<V, E> *graphFormat); GraphFormat<V, E> *graphFormat);
~GraphStore(); ~GraphStore();

View File

@ -78,8 +78,7 @@ void IncomingCache<M>::parseMessages(VPackSlice incomingMessages) {
template <typename M> template <typename M>
void IncomingCache<M>::setDirect(std::string const& toValue, void IncomingCache<M>::setDirect(std::string const& toValue,
M const& newValue) { M const& newValue) {
{ MUTEX_LOCKER(guard, _writeLock);
CONDITION_LOCKER(guard, _writeCondition);
_receivedMessageCount++; _receivedMessageCount++;
auto vmsg = _messages.find(toValue); auto vmsg = _messages.find(toValue);
@ -88,8 +87,6 @@ void IncomingCache<M>::setDirect(std::string const& toValue,
} else { } else {
_messages[toValue] = newValue; _messages[toValue] = newValue;
} }
}
_writeCondition.signal();
} }
template <typename M> template <typename M>

View File

@ -28,8 +28,7 @@
#include <string> #include <string>
#include "Basics/Common.h" #include "Basics/Common.h"
#include "Basics/ConditionLocker.h" #include "Basics/Mutex.h"
#include "Basics/ConditionVariable.h"
#include "MessageCombiner.h" #include "MessageCombiner.h"
#include "MessageFormat.h" #include "MessageFormat.h"
@ -63,7 +62,7 @@ class IncomingCache {
size_t receivedMessageCount() { return _receivedMessageCount; } size_t receivedMessageCount() { return _receivedMessageCount; }
private: private:
arangodb::basics::ConditionVariable _writeCondition; mutable Mutex _writeLock;
std::unordered_map<std::string, M> _messages; std::unordered_map<std::string, M> _messages;
size_t _receivedMessageCount; size_t _receivedMessageCount;

View File

@ -44,7 +44,6 @@ OutgoingCache<M>::OutgoingCache (WorkerState *state,
MessageCombiner<M> *combiner, MessageCombiner<M> *combiner,
IncomingCache<M> *cache) IncomingCache<M> *cache)
: _state(state), _format(format), _combiner(combiner), _localCache(cache) { : _state(state), _format(format), _combiner(combiner), _localCache(cache) {
_ci = ClusterInfo::instance();
_baseUrl = Utils::baseUrl(_state->database()); _baseUrl = Utils::baseUrl(_state->database());
} }
@ -59,38 +58,6 @@ void OutgoingCache<M>::clear() {
_containedMessages = 0; _containedMessages = 0;
} }
static inline LogicalCollection* resolveCollection(
ClusterInfo* ci, std::string const& database,
std::string const& collectionName,
std::map<std::string, std::string> const& collectionPlanIdMap) {
auto const& it = collectionPlanIdMap.find(collectionName);
if (it != collectionPlanIdMap.end()) {
std::shared_ptr<LogicalCollection> collectionInfo(ci->getCollection(database,
it->second));
return collectionInfo.get();
}
return nullptr;
}
static inline void resolveShard(ClusterInfo* ci, LogicalCollection* info,
std::string const& vertexKey,
std::string& responsibleShard) {
bool usesDefaultShardingAttributes;
VPackBuilder partial;
partial.openObject();
partial.add(StaticStrings::KeyString, VPackValue(vertexKey));
partial.close();
LOG(INFO) << "Partial doc: " << partial.toJson();
int res =
ci->getResponsibleShard(info, partial.slice(), true, responsibleShard,
usesDefaultShardingAttributes);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(
res, "OutgoingCache could not resolve the responsible shard");
}
TRI_ASSERT(usesDefaultShardingAttributes); // should be true anyway
}
template <typename M> template <typename M>
void OutgoingCache<M>::sendMessageTo(std::string const& toValue, void OutgoingCache<M>::sendMessageTo(std::string const& toValue,
M const& data) { M const& data) {
@ -99,14 +66,15 @@ void OutgoingCache<M>::sendMessageTo(std::string const& toValue,
std::string collectionName = toValue.substr(0, pos); std::string collectionName = toValue.substr(0, pos);
LOG(INFO) << "Adding outgoing messages for " << collectionName << "/" << _key; LOG(INFO) << "Adding outgoing messages for " << collectionName << "/" << _key;
LogicalCollection* coll = resolveCollection( LogicalCollection* coll = Utils::resolveCollection(_state->database(),
_ci, _state->database(), collectionName, _state->collectionPlanIdMap()); collectionName,
_state->collectionPlanIdMap());
if (coll == nullptr) { if (coll == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"Collection this messages is going to is unkown"); "Collection this messages is going to is unkown");
} }
ShardID responsibleShard; ShardID responsibleShard;
resolveShard(_ci, coll, _key, responsibleShard); Utils::resolveShard(coll, StaticStrings::KeyString, _key, responsibleShard);
LOG(INFO) << "Responsible shard: " << responsibleShard; LOG(INFO) << "Responsible shard: " << responsibleShard;
std::vector<ShardID> const& localShards = _state->localVertexShardIDs(); std::vector<ShardID> const& localShards = _state->localVertexShardIDs();

View File

@ -57,7 +57,6 @@ class OutgoingCache {
MessageFormat<M> *_format; MessageFormat<M> *_format;
MessageCombiner<M> *_combiner; MessageCombiner<M> *_combiner;
IncomingCache<M> *_localCache; IncomingCache<M> *_localCache;
ClusterInfo* _ci;
std::string _baseUrl; std::string _baseUrl;
/// @brief two stage map: shard -> vertice -> message /// @brief two stage map: shard -> vertice -> message

View File

@ -1,79 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#include "ResultWriter.h"
#include "GraphStore.h"
#include "Utils/OperationCursor.h"
#include "Utils/SingleCollectionTransaction.h"
#include "Utils/StandaloneTransactionContext.h"
#include "Utils/Transaction.h"
#include "VocBase/ticks.h"
#include "VocBase/vocbase.h"
#include <velocypack/Iterator.h>
#include <velocypack/velocypack-aliases.h>
using namespace arangodb;
using namespace arangodb::pregel;
template <typename V, typename E>
void ResultWriter<V, E>::writeResults(TRI_vocbase_t* vocbase,
GraphStore<V, E>* store) {
/*SingleCollectionTransaction
trx(StandaloneTransactionContext::Create(vocbase),
_vertexCollection, TRI_TRANSACTION_WRITE);
int res = trx.begin();
if (res != TRI_ERROR_NO_ERROR) {
LOG(ERR) << "cannot start transaction to load authentication";
return;
}
OperationResult result;
OperationOptions options;
options.waitForSync = false;
options.mergeObjects = true;
auto verticeIt = store->vertexIterator();
for (auto const &pair : verticeIt) {
//TransactionBuilderLeaser b(&trx);
VPackBuilder b;
b.openObject();
b.add(StaticStrings::KeyString,
pair.second->_data.get(StaticStrings::KeyString));
b.add("value", VPackValue(pair.second->_vertexState));
b.close();
LOG(INFO) << b.toJson();
result = trx.update(_vertexCollection, b->slice(), options);
if (!result.successful()) {
THROW_ARANGO_EXCEPTION_FORMAT(result.code, "while looking up graph '%s'",
_vertexCollection.c_str());
}
}
// Commit or abort.
res = trx.finish(result.code);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_FORMAT(res, "while looking up graph '%s'",
_vertexCollection.c_str());
}*/
}

View File

@ -1,47 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2016 ArangoDB GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Simon Grätzer
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_PREGEL_ALGO_SSSP_H
#define ARANGODB_PREGEL_ALGO_SSSP_H 1
#include "Algorithm.h"
struct TRI_vocbase_t;
namespace arangodb {
namespace pregel {
template <typename V, typename E>
class GraphStore;
template <typename V, typename E>
class ResultWriter {
bool _writeVertices = true;
bool _writeEdges = true;
bool _writeInSameCollections = true;
bool resultField;
public:
ResultWriter(VPackSlice params) {}
void writeResults(TRI_vocbase_t* vocbase, GraphStore<V, E>* store);
};
}
}
#endif

View File

@ -28,7 +28,10 @@
#include "VocBase/LogicalCollection.h" #include "VocBase/LogicalCollection.h"
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"
#include "VocBase/vocbase.h" #include "VocBase/vocbase.h"
#include "Cluster/ClusterInfo.h"
#include "VocBase/LogicalCollection.h"
using namespace arangodb;
using namespace arangodb::pregel; using namespace arangodb::pregel;
std::string const Utils::edgeShardingKey = "_vertex"; std::string const Utils::edgeShardingKey = "_vertex";
@ -41,9 +44,8 @@ std::string const Utils::messagesPath = "messages";
std::string const Utils::finalizeExecutionPath = "finalizeExecution"; std::string const Utils::finalizeExecutionPath = "finalizeExecution";
std::string const Utils::executionNumberKey = "exn"; std::string const Utils::executionNumberKey = "exn";
std::string const Utils::collectionPlanIdMapKey = "collectionPlanIdMap"; std::string const Utils::collectionPlanIdMapKey = "collectionPlanIdMap";
std::string const Utils::edgeCollectionPlanIdKey = "edgePlanId";
std::string const Utils::vertexShardsListKey = "vertexShards"; std::string const Utils::vertexShardsListKey = "vertexShards";
std::string const Utils::edgeShardsListKey = "edgeShards"; std::string const Utils::edgeShardsListKey = "edgeShards";
@ -94,3 +96,37 @@ int64_t Utils::countDocuments(TRI_vocbase_t* vocbase,
TRI_ASSERT(s.isNumber()); TRI_ASSERT(s.isNumber());
return s.getInt(); return s.getInt();
} }
LogicalCollection* Utils::resolveCollection(std::string const& database,
std::string const& collectionName,
std::map<std::string, std::string> const& collectionPlanIdMap) {
ClusterInfo* ci = ClusterInfo::instance();
auto const& it = collectionPlanIdMap.find(collectionName);
if (it != collectionPlanIdMap.end()) {
std::shared_ptr<LogicalCollection> collectionInfo(ci->getCollection(database,
it->second));
return collectionInfo.get();
}
return nullptr;
}
void Utils::resolveShard(LogicalCollection* info,
std::string const& shardKey,
std::string const& vertexKey,
std::string& responsibleShard) {
ClusterInfo* ci = ClusterInfo::instance();
bool usesDefaultShardingAttributes;
VPackBuilder partial;
partial.openObject();
partial.add(shardKey, VPackValue(vertexKey));
partial.close();
LOG(INFO) << "Partial doc: " << partial.toJson();
int res =
ci->getResponsibleShard(info, partial.slice(), true, responsibleShard,
usesDefaultShardingAttributes);
if (res != TRI_ERROR_NO_ERROR) {
THROW_ARANGO_EXCEPTION_MESSAGE(
res, "could not resolve the responsible shard");
}
TRI_ASSERT(usesDefaultShardingAttributes); // should be true anyway
}

View File

@ -27,10 +27,11 @@
#include "Basics/Common.h" #include "Basics/Common.h"
struct TRI_vocbase_t; struct TRI_vocbase_t;
namespace arangodb { namespace arangodb {
class LogicalCollection;
namespace pregel { namespace pregel {
class Utils { class Utils {
Utils() = delete; Utils() = delete;
@ -49,8 +50,8 @@ class Utils {
static std::string const executionNumberKey; static std::string const executionNumberKey;
static std::string const algorithmKey; static std::string const algorithmKey;
static std::string const coordinatorIdKey; static std::string const coordinatorIdKey;
static std::string const collectionPlanIdMapKey; static std::string const collectionPlanIdMapKey;
static std::string const edgeCollectionPlanIdKey;
static std::string const vertexShardsListKey; static std::string const vertexShardsListKey;
static std::string const edgeShardsListKey; static std::string const edgeShardsListKey;
@ -71,6 +72,14 @@ class Utils {
static int64_t countDocuments(TRI_vocbase_t* vocbase, static int64_t countDocuments(TRI_vocbase_t* vocbase,
std::string const& collection); std::string const& collection);
static LogicalCollection* resolveCollection(std::string const& database,
std::string const& collectionName,
std::map<std::string, std::string>
const& collectionPlanIdMap);
static void resolveShard(LogicalCollection* info,
std::string const& shardKey,
std::string const& vertexKey,
std::string& responsibleShard);
}; };
} }
} }

View File

@ -55,13 +55,13 @@ IWorker* IWorker::createWorker(TRI_vocbase_t* vocbase, VPackSlice body) {
VPackSlice userParams = body.get(Utils::userParametersKey); VPackSlice userParams = body.get(Utils::userParametersKey);
IWorker* worker = nullptr; IWorker* worker = nullptr;
if (algorithm.compareString("sssp") == 0) { if (algorithm.compareString("sssp") == 0) {
std::unique_ptr<algos::SSSPAlgorithm> algo(new algos::SSSPAlgorithm()); worker = new Worker<int64_t, int64_t, int64_t>(vocbase,
worker = new Worker<int64_t, int64_t, int64_t>(vocbase, algo.get(), body); new algos::SSSPAlgorithm(),
algo.release(); body);
} else if (algorithm.compareString("pagerank") == 0) { } else if (algorithm.compareString("pagerank") == 0) {
std::unique_ptr<algos::PageRankAlgorithm> algo(new algos::PageRankAlgorithm(userParams)); worker = new Worker<float, float, float>(vocbase,
worker = new Worker<float, float, float>(vocbase, algo.get(), body); new algos::PageRankAlgorithm(userParams),
algo.release(); body);
} else { } else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"Unsupported Algorithm"); "Unsupported Algorithm");
@ -73,18 +73,16 @@ template <typename V, typename E, typename M>
Worker<V, E, M>::Worker(TRI_vocbase_t* vocbase, Worker<V, E, M>::Worker(TRI_vocbase_t* vocbase,
Algorithm<V, E, M> *algo, Algorithm<V, E, M> *algo,
VPackSlice initConfig) VPackSlice initConfig)
: _running(true) { : _running(true), _algorithm(algo) {
VPackSlice userParams = initConfig.get(Utils::userParametersKey); VPackSlice userParams = initConfig.get(Utils::userParametersKey);
_state.reset(new WorkerState(vocbase->name(), initConfig)); _state.reset(new WorkerState(vocbase->name(), initConfig));
_algorithm.reset(algo);
_workerContext.reset(algo->workerContext(userParams)); _workerContext.reset(algo->workerContext(userParams));
const size_t threadNum = 1; const size_t threadNum = 1;
_workerPool.reset(new ThreadPool(static_cast<size_t>(threadNum), "Pregel Worker")); _workerPool.reset(new ThreadPool(static_cast<size_t>(threadNum), "Pregel Worker"));
_graphStore.reset(new GraphStore<V, E>(_state->localVertexShardIDs(), _graphStore.reset(new GraphStore<V, E>(vocbase,
_state->localEdgeShardIDs(), _state.get(),
vocbase,
algo->inputFormat())); algo->inputFormat()));
std::shared_ptr<MessageFormat<M>> mFormat(algo->messageFormat()); std::shared_ptr<MessageFormat<M>> mFormat(algo->messageFormat());
std::shared_ptr<MessageCombiner<M>> combiner(algo->messageCombiner()); std::shared_ptr<MessageCombiner<M>> combiner(algo->messageCombiner());

View File

@ -81,7 +81,7 @@ class Worker : public IWorker {
std::unique_ptr<AggregatorUsage> _workerAggregators; std::unique_ptr<AggregatorUsage> _workerAggregators;
void swapIncomingCaches() { void swapIncomingCaches() {
std::swap(_readCache, _writeCache); _readCache.swap(_writeCache);
_writeCache->clear(); _writeCache->clear();
} }
void workerJobIsDone(bool allVerticesHalted); void workerJobIsDone(bool allVerticesHalted);

View File

@ -36,10 +36,10 @@ WorkerState::WorkerState(DatabaseID dbname,
VPackSlice edgeShardIDs = params.get(Utils::edgeShardsListKey); VPackSlice edgeShardIDs = params.get(Utils::edgeShardsListKey);
VPackSlice execNum = params.get(Utils::executionNumberKey); VPackSlice execNum = params.get(Utils::executionNumberKey);
VPackSlice collectionPlanIdMap = params.get(Utils::collectionPlanIdMapKey); VPackSlice collectionPlanIdMap = params.get(Utils::collectionPlanIdMapKey);
//VPackSlice threadNum = params.get(Utils::collectionPlanIdMapKey); VPackSlice edgePlanID = params.get(Utils::edgeCollectionPlanIdKey);
if (!coordID.isString() || !vertexShardIDs.isArray() || if (!coordID.isString() || !vertexShardIDs.isArray() ||
!edgeShardIDs.isArray() || !execNum.isInteger() || !edgeShardIDs.isArray() || !execNum.isInteger() ||
!collectionPlanIdMap.isObject()) { !collectionPlanIdMap.isObject() || !edgePlanID.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER, THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"Supplied bad parameters to worker"); "Supplied bad parameters to worker");
} }
@ -65,5 +65,6 @@ WorkerState::WorkerState(DatabaseID dbname,
for (auto const& it : VPackObjectIterator(collectionPlanIdMap)) { for (auto const& it : VPackObjectIterator(collectionPlanIdMap)) {
_collectionPlanIdMap[it.key.copyString()] = it.value.copyString(); _collectionPlanIdMap[it.key.copyString()] = it.value.copyString();
} }
_edgeCollectionPlanId = edgePlanID.copyString();
} }

View File

@ -64,6 +64,10 @@ class WorkerState {
return _collectionPlanIdMap; return _collectionPlanIdMap;
}; };
std::string const& edgeCollectionPlanId() const {
return _edgeCollectionPlanId;
}
//inline uint64_t numWorkerThreads() { //inline uint64_t numWorkerThreads() {
// return _numWorkerThreads; // return _numWorkerThreads;
//} //}
@ -77,6 +81,7 @@ class WorkerState {
const std::string _database; const std::string _database;
std::vector<ShardID> _localVertexShardIDs, _localEdgeShardIDs; std::vector<ShardID> _localVertexShardIDs, _localEdgeShardIDs;
std::map<std::string, std::string> _collectionPlanIdMap; std::map<std::string, std::string> _collectionPlanIdMap;
std::string _edgeCollectionPlanId;
}; };
} }
} }

View File

@ -126,9 +126,8 @@ RestStatus RestPregelHandler::execute() {
} }
} }
VPackBuilder result; VPackSlice result;
result.add(VPackValue("thanks")); generateResult(rest::ResponseCode::OK, result);
generateResult(rest::ResponseCode::OK, result.slice());
} catch (std::exception const &e) { } catch (std::exception const &e) {
LOG(ERR) << e.what(); LOG(ERR) << e.what();