1
0
Fork 0

backport: use vocbase reference instead of pointer in arangodb::pregel::GraphStore

This commit is contained in:
Vasiliy 2018-04-13 11:23:34 +03:00
parent f392925903
commit 0abd46ad73
20 changed files with 181 additions and 84 deletions

View File

@ -34,9 +34,14 @@
namespace arangodb {
MMFilesExportCursor::MMFilesExportCursor(TRI_vocbase_t* vocbase, CursorId id,
arangodb::MMFilesCollectionExport* ex, size_t batchSize,
double ttl, bool hasCount)
MMFilesExportCursor::MMFilesExportCursor(
TRI_vocbase_t& vocbase,
CursorId id,
arangodb::MMFilesCollectionExport* ex,
size_t batchSize,
double ttl,
bool hasCount
)
: Cursor(id, batchSize, ttl, hasCount),
_guard(vocbase),
_ex(ex),

View File

@ -30,16 +30,22 @@
#include "VocBase/voc-types.h"
namespace arangodb {
class MMFilesCollectionExport;
class MMFilesExportCursor final : public Cursor {
public:
MMFilesExportCursor(TRI_vocbase_t*, CursorId, arangodb::MMFilesCollectionExport*, size_t,
double, bool);
MMFilesExportCursor(
TRI_vocbase_t& vocbase,
CursorId id,
arangodb::MMFilesCollectionExport* ex,
size_t batchSize,
double ttl,
bool hasCount
);
~MMFilesExportCursor();
public:
CursorType type() const override final { return CURSOR_EXPORT; }
bool hasNext();
@ -58,6 +64,7 @@ class MMFilesExportCursor final : public Cursor {
size_t _position;
size_t const _size;
};
}
#endif

View File

@ -264,9 +264,10 @@ void MMFilesRestExportHandler::createCursor() {
TRI_ASSERT(cursors != nullptr);
Cursor* c = nullptr;
{
auto cursor = std::make_unique<MMFilesExportCursor>(
&_vocbase,
_vocbase,
TRI_NewTickServer(),
collectionExport.get(),
batchSize,

View File

@ -75,13 +75,20 @@ IAlgorithm* AlgoRegistry::createAlgorithm(std::string const& algorithm,
}
template <typename V, typename E, typename M>
std::unique_ptr<IWorker> AlgoRegistry::createWorker(TRI_vocbase_t* vocbase,
Algorithm<V, E, M>* algo, VPackSlice body) {
/*static*/ std::unique_ptr<IWorker> AlgoRegistry::createWorker(
TRI_vocbase_t& vocbase,
Algorithm<V, E, M>* algo,
VPackSlice body
) {
return std::make_unique<Worker<V, E, M>>(vocbase, algo, body);
}
std::unique_ptr<IWorker> AlgoRegistry::createWorker(TRI_vocbase_t* vocbase, VPackSlice body) {
/*static*/std::unique_ptr<IWorker> AlgoRegistry::createWorker(
TRI_vocbase_t& vocbase,
VPackSlice body
) {
VPackSlice algoSlice = body.get(Utils::algorithmKey);
if (!algoSlice.isString()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_BAD_PARAMETER,
"Supplied bad parameters to worker");

View File

@ -28,18 +28,28 @@
#include "Worker.h"
struct TRI_vocbase_t;
namespace arangodb {
namespace pregel {
struct AlgoRegistry {
static IAlgorithm* createAlgorithm(std::string const& algorithm,
VPackSlice userParams);
static std::unique_ptr<IWorker> createWorker(TRI_vocbase_t* vocbase, VPackSlice body);
static std::unique_ptr<IWorker> createWorker(
TRI_vocbase_t& vocbase,
VPackSlice body
);
private:
template <typename V, typename E, typename M>
static std::unique_ptr<IWorker> createWorker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo,
VPackSlice body);
static std::unique_ptr<IWorker> createWorker(
TRI_vocbase_t& vocbase,
Algorithm<V, E, M>* algo,
VPackSlice body
);
};
}
}
#endif

View File

@ -635,7 +635,7 @@ int Conductor::_initializeWorkers(std::string const& suffix,
}
auto created =
AlgoRegistry::createWorker(&(_vocbaseGuard.database()), b.slice());
AlgoRegistry::createWorker(_vocbaseGuard.database(), b.slice());
TRI_ASSERT(created.get() != nullptr);
PregelFeature::instance()->addWorker(std::move(created), _executionNumber);
@ -764,7 +764,7 @@ int Conductor::_sendToAllDBServers(std::string const& path,
VPackBuilder response;
PregelFeature::handleWorkerRequest(
&(_vocbaseGuard.database()), path, message.slice(), response
_vocbaseGuard.database(), path, message.slice(), response
);
handle(response.slice());
} else {
@ -774,7 +774,7 @@ int Conductor::_sendToAllDBServers(std::string const& path,
VPackBuilder response;
PregelFeature::handleWorkerRequest(
&(_vocbaseGuard.database()), path, message.slice(), response
_vocbaseGuard.database(), path, message.slice(), response
);
});
}

View File

@ -69,7 +69,7 @@ static uint64_t TRI_totalSystemMemory() {
}
template <typename V, typename E>
GraphStore<V, E>::GraphStore(TRI_vocbase_t* vb, GraphFormat<V, E>* graphFormat)
GraphStore<V, E>::GraphStore(TRI_vocbase_t& vb, GraphFormat<V, E>* graphFormat)
: _vocbaseGuard(vb),
_graphFormat(graphFormat),
_localVerticeCount(0),

View File

@ -38,10 +38,13 @@
struct TRI_vocbase_t;
namespace arangodb {
class LogicalCollection;
namespace transaction {
class Methods;
}
namespace pregel {
template <typename T>
@ -57,7 +60,7 @@ template <typename V, typename E>
class GraphStore {
public:
GraphStore(TRI_vocbase_t* vocbase, GraphFormat<V, E>* graphFormat);
GraphStore(TRI_vocbase_t& vocbase, GraphFormat<V, E>* graphFormat);
~GraphStore();
uint64_t localVertexCount() const { return _localVerticeCount; }
@ -95,20 +98,22 @@ private:
RangeIterator<VertexEntry>& it);
std::unique_ptr<transaction::Methods> _createTransaction();
private:
DatabaseGuard _vocbaseGuard;
const std::unique_ptr<GraphFormat<V, E>> _graphFormat;
WorkerConfig* _config = nullptr;
/// Holds vertex keys and pointers to vertex data and edges
std::vector<VertexEntry> _index;
/// Vertex data
TypedBuffer<V>* _vertexData = nullptr;
/// Edges (and data)
TypedBuffer<Edge<E>>* _edges = nullptr;
// cache the amount of vertices
std::set<ShardID> _loadedShards;
// hold the current position where the ith vertex shard can
// start to write its data. At the end the offset should equal the
// sum of the counts of all ith edge shards
@ -120,6 +125,8 @@ private:
std::atomic<uint32_t> _runningThreads;
bool _destroyed = false;
};
}
}
#endif

View File

@ -193,19 +193,23 @@ void PregelFeature::handleConductorRequest(std::string const& path,
}
}
void PregelFeature::handleWorkerRequest(TRI_vocbase_t* vocbase,
std::string const& path,
VPackSlice const& body,
VPackBuilder& outBuilder) {
/*static*/ void PregelFeature::handleWorkerRequest(
TRI_vocbase_t& vocbase,
std::string const& path,
VPackSlice const& body,
VPackBuilder& outBuilder
) {
if (SchedulerFeature::SCHEDULER->isStopping()) {
return; // shutdown ongoing
}
VPackSlice sExecutionNum = body.get(Utils::executionNumberKey);
if (!sExecutionNum.isInteger()) {
THROW_ARANGO_EXCEPTION_MESSAGE(
TRI_ERROR_INTERNAL, "Worker not found, invalid execution number");
}
uint64_t exeNum = sExecutionNum.getUInt();
std::shared_ptr<IWorker> w = Instance->worker(exeNum);
@ -216,14 +220,18 @@ void PregelFeature::handleWorkerRequest(TRI_vocbase_t* vocbase,
TRI_ERROR_INTERNAL,
"Worker with this execution number already exists.");
}
Instance->addWorker(AlgoRegistry::createWorker(vocbase, body), exeNum);
Instance->worker(exeNum)->setupWorker(); // will call conductor
return;
} else if (path == Utils::startRecoveryPath) {
if (!w) {
Instance->addWorker(AlgoRegistry::createWorker(vocbase, body), exeNum);
}
Instance->worker(exeNum)->startRecovery(body);
return;
} else if (!w) {
// any other call should have a working worker instance

View File

@ -29,6 +29,7 @@
#include "Basics/Mutex.h"
struct TRI_vocbase_t;
namespace arangodb {
namespace pregel {
@ -69,10 +70,12 @@ class PregelFeature final : public application_features::ApplicationFeature {
static void handleConductorRequest(std::string const& path,
VPackSlice const& body,
VPackBuilder& outResponse);
static void handleWorkerRequest(TRI_vocbase_t* vocbase,
std::string const& path,
VPackSlice const& body,
VPackBuilder& outBuilder);
static void handleWorkerRequest(
TRI_vocbase_t& vocbase,
std::string const& path,
VPackSlice const& body,
VPackBuilder& outBuilder
);
private:
Mutex _mutex;
@ -80,6 +83,7 @@ class PregelFeature final : public application_features::ApplicationFeature {
std::unordered_map<uint64_t, std::shared_ptr<Conductor>> _conductors;
std::unordered_map<uint64_t, std::shared_ptr<IWorker>> _workers;
};
}
}

View File

@ -57,27 +57,33 @@ using namespace arangodb::pregel;
&lock, arangodb::basics::LockerType::BLOCKING, true, __FILE__, __LINE__)
template <typename V, typename E, typename M>
Worker<V, E, M>::Worker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo,
VPackSlice initConfig)
Worker<V, E, M>::Worker(
TRI_vocbase_t& vocbase,
Algorithm<V, E, M>* algo,
VPackSlice initConfig
)
: _state(WorkerState::IDLE),
_config(vocbase, initConfig),
_config(&vocbase, initConfig),
_algorithm(algo),
_nextGSSSendMessageCount(0),
_requestedNextGSS(false) {
MUTEX_LOCKER(guard, _commandMutex);
VPackSlice userParams = initConfig.get(Utils::userParametersKey);
_workerContext.reset(algo->workerContext(userParams));
_messageFormat.reset(algo->messageFormat());
_messageCombiner.reset(algo->messageCombiner());
_conductorAggregators.reset(new AggregatorHandler(algo));
_workerAggregators.reset(new AggregatorHandler(algo));
_graphStore.reset(new GraphStore<V, E>(vocbase, _algorithm->inputFormat()));
if (_config.asynchronousMode()) {
_messageBatchSize = _algorithm->messageBatchSize(_config, _messageStats);
} else {
_messageBatchSize = 5000;
}
_initializeMessageCaches();
}

View File

@ -35,8 +35,11 @@
#include "Pregel/WorkerContext.h"
struct TRI_vocbase_t;
namespace arangodb {
class RestPregelHandler;
namespace pregel {
class IWorker {
@ -146,8 +149,11 @@ class Worker : public IWorker {
std::function<void(VPackSlice slice)> handle);
public:
Worker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algorithm,
VPackSlice params);
Worker(
TRI_vocbase_t& vocbase,
Algorithm<V, E, M>* algorithm,
VPackSlice params
);
~Worker();
// ====== called by rest handler =====
@ -165,6 +171,8 @@ class Worker : public IWorker {
void aqlResult(VPackBuilder*) const override;
};
}
}
#endif

View File

@ -74,7 +74,7 @@ RestStatus RestPregelHandler::execute() {
}
*/
} else if (suffix[0] == Utils::workerPrefix) {
PregelFeature::handleWorkerRequest(&_vocbase, suffix[1], body, response);
PregelFeature::handleWorkerRequest(_vocbase, suffix[1], body, response);
generateResult(rest::ResponseCode::OK, response.slice());
/* if (buffer.empty()) {

View File

@ -45,19 +45,25 @@
using namespace arangodb;
RocksDBExportCursor::RocksDBExportCursor(
TRI_vocbase_t* vocbase, std::string const& name,
CollectionExport::Restrictions const& restrictions, CursorId id,
size_t limit, size_t batchSize, double ttl, bool hasCount)
: Cursor(id, batchSize, ttl, hasCount),
_guard(vocbase),
_resolver(vocbase),
_restrictions(restrictions),
_name(name),
_trx(new SingleCollectionTransaction(
transaction::StandaloneContext::Create(vocbase), _name,
AccessMode::Type::READ)),
_position(0) {
TRI_vocbase_t& vocbase,
std::string const& name,
CollectionExport::Restrictions const& restrictions,
CursorId id,
size_t limit,
size_t batchSize,
double ttl,
bool hasCount
): Cursor(id, batchSize, ttl, hasCount),
_guard(vocbase),
_resolver(&vocbase),
_restrictions(restrictions),
_name(name),
_trx(new SingleCollectionTransaction(
transaction::StandaloneContext::Create(&vocbase),
_name,
AccessMode::Type::READ
)),
_position(0) {
Result res = _trx->begin();
if (!res.ok()) {

View File

@ -33,18 +33,25 @@
#include "VocBase/voc-types.h"
namespace arangodb {
class IndexIterator;
class SingleCollectionTransaction;
class RocksDBExportCursor final : public Cursor {
public:
RocksDBExportCursor(TRI_vocbase_t*, std::string const&,
CollectionExport::Restrictions const&, CursorId, size_t,
size_t, double, bool);
RocksDBExportCursor(
TRI_vocbase_t& vocbase,
std::string const& name,
CollectionExport::Restrictions const& restrictions,
CursorId id,
size_t limit,
size_t batchSize,
double ttl,
bool hasCount
);
~RocksDBExportCursor();
public:
CursorType type() const override final { return CURSOR_EXPORT; }
bool hasNext();
@ -67,6 +74,7 @@ class RocksDBExportCursor final : public Cursor {
size_t _position;
size_t _size;
};
}
#endif

View File

@ -111,16 +111,19 @@ TRI_vocbase_t* RocksDBReplicationContext::vocbase() const {
}
// creates new transaction/snapshot
void RocksDBReplicationContext::bind(TRI_vocbase_t* vocbase) {
void RocksDBReplicationContext::bind(TRI_vocbase_t& vocbase) {
TRI_ASSERT(_exclusive);
internalBind(vocbase);
}
void RocksDBReplicationContext::internalBind(TRI_vocbase_t* vocbase,
bool allowChange) {
if (!_trx || !_guard || (&(_guard->database()) != vocbase)) {
void RocksDBReplicationContext::internalBind(
TRI_vocbase_t& vocbase,
bool allowChange /*= true*/
) {
if (!_trx || !_guard || (&(_guard->database()) != &vocbase)) {
TRI_ASSERT(allowChange);
rocksdb::Snapshot const* snap = nullptr;
if (_trx) {
auto state = RocksDBTransactionState::toState(_trx.get());
snap = state->stealSnapshot();
@ -135,30 +138,41 @@ void RocksDBReplicationContext::internalBind(TRI_vocbase_t* vocbase,
transactionOptions.waitForSync = false;
transactionOptions.allowImplicitCollections = true;
auto ctx = transaction::StandaloneContext::Create(vocbase);
auto ctx = transaction::StandaloneContext::Create(&vocbase);
_trx.reset(
new transaction::UserTransaction(ctx, {}, {}, {}, transactionOptions));
auto state = RocksDBTransactionState::toState(_trx.get());
state->prepareForParallelReads();
if (snap != nullptr) {
state->donateSnapshot(snap);
TRI_ASSERT(snap->GetSequenceNumber() == state->sequenceNumber());
}
Result res = _trx->begin();
if (!res.ok()) {
_guard.reset();
THROW_ARANGO_EXCEPTION(res);
}
_lastTick = state->sequenceNumber();
}
}
int RocksDBReplicationContext::bindCollection(
TRI_vocbase_t* vocbase, std::string const& collectionIdentifier) {
TRI_vocbase_t& vocbase,
std::string const& collectionIdentifier
) {
TRI_ASSERT(_exclusive);
TRI_ASSERT(nullptr != _trx);
internalBind(vocbase);
TRI_voc_cid_t const id{::normalizeIdentifier(*_trx, collectionIdentifier)};
if (0 == id) {
return TRI_ERROR_BAD_PARAMETER;
}
@ -167,7 +181,9 @@ int RocksDBReplicationContext::bindCollection(
if (_collection) {
_collection->release();
}
_collection = getCollectionIterator(id);
if (nullptr == _collection) {
return TRI_ERROR_BAD_PARAMETER;
}
@ -176,11 +192,11 @@ int RocksDBReplicationContext::bindCollection(
return TRI_ERROR_NO_ERROR;
}
int RocksDBReplicationContext::chooseDatabase(TRI_vocbase_t* vocbase) {
int RocksDBReplicationContext::chooseDatabase(TRI_vocbase_t& vocbase) {
TRI_ASSERT(_users > 0);
MUTEX_LOCKER(locker, _contextLock);
if (&(_guard->database()) == vocbase) {
if (&(_guard->database()) == &vocbase) {
return TRI_ERROR_NO_ERROR; // nothing to do here
}
@ -191,6 +207,7 @@ int RocksDBReplicationContext::chooseDatabase(TRI_vocbase_t* vocbase) {
// make the actual change
internalBind(vocbase, true);
return TRI_ERROR_NO_ERROR;
}

View File

@ -75,9 +75,12 @@ class RocksDBReplicationContext {
TRI_vocbase_t* vocbase() const;
// creates new transaction/snapshot
void bind(TRI_vocbase_t*);
int bindCollection(TRI_vocbase_t*, std::string const& collectionIdentifier);
int chooseDatabase(TRI_vocbase_t*);
void bind(TRI_vocbase_t& vocbase);
int bindCollection(
TRI_vocbase_t& vocbase,
std::string const& collectionIdentifier
);
int chooseDatabase(TRI_vocbase_t& vocbase);
// returns inventory
Result getInventory(TRI_vocbase_t* vocbase, bool includeSystem,
@ -115,9 +118,8 @@ class RocksDBReplicationContext {
private:
void releaseDumpingResources();
CollectionIterator* getCollectionIterator(TRI_voc_cid_t id);
void internalBind(TRI_vocbase_t*, bool allowChange = true);
void internalBind(TRI_vocbase_t& vocbase, bool allowChange = true);
private:
mutable Mutex _contextLock;
TRI_vocbase_t* _vocbase;
TRI_server_id_t const _serverId;

View File

@ -236,9 +236,10 @@ void RocksDBRestExportHandler::createCursor() {
TRI_ASSERT(cursors != nullptr);
Cursor* c = nullptr;
{
auto cursor = std::make_unique<RocksDBExportCursor>(
&_vocbase,
_vocbase,
name,
_restrictions,
TRI_NewTickServer(),

View File

@ -86,7 +86,7 @@ void RocksDBRestReplicationHandler::handleCommandBatch() {
// create transaction+snapshot, ttl will be 300 if `ttl == 0``
auto* ctx = _manager->createContext(&_vocbase, ttl, serverId);
RocksDBReplicationContextGuard guard(_manager, ctx);
ctx->bind(&_vocbase);
ctx->bind(_vocbase);
VPackBuilder b;
b.add(VPackValue(VPackValueType::Object));
@ -473,7 +473,7 @@ void RocksDBRestReplicationHandler::handleCommandCreateKeys() {
//}
// bind collection to context - will initialize iterator
int res = ctx->bindCollection(&_vocbase, collection);
int res = ctx->bindCollection(_vocbase, collection);
if (res != TRI_ERROR_NO_ERROR) {
generateError(rest::ResponseCode::NOT_FOUND,
@ -711,7 +711,7 @@ void RocksDBRestReplicationHandler::handleCommandDump() {
}
if (!isBusy) {
int res = context->chooseDatabase(&_vocbase);
int res = context->chooseDatabase(_vocbase);
isBusy = (TRI_ERROR_CURSOR_BUSY == res);
}

View File

@ -45,7 +45,7 @@ class DatabaseGuard {
/// @brief create guard on existing db pointer (not nullptr)
/// @deprecated DO NOT USE for new code
/// FIXME TODO remove once V8Task and arangodb::pregel::GraphStore are fixed
/// FIXME TODO remove once V8Task is fixed
explicit DatabaseGuard(TRI_vocbase_t* vocbase);
/// @brief create the guard, using a database id