1
0
Fork 0

Merge branch 'devel' of https://github.com/arangodb/arangodb into devel

This commit is contained in:
jsteemann 2017-04-25 14:41:13 +02:00
commit f4972d38ab
7 changed files with 888 additions and 320 deletions

View File

@ -75,7 +75,7 @@ class Edge {
// EdgeEntry() : _nextEntryOffset(0), _dataSize(0), _vertexIDSize(0) {}
Edge() : _targetShard(InvalidPregelShard) {}
Edge(PregelShard target, PregelKey const& key)
: _targetShard(target), _toKey(key) {}
: _targetShard(target), _toKey(key), _data(0) {}
// size_t getSize() { return sizeof(EdgeEntry) + _vertexIDSize + _dataSize; }
PregelKey const& toKey() const { return _toKey; }

View File

@ -62,7 +62,8 @@ Worker<V, E, M>::Worker(TRI_vocbase_t* vocbase, Algorithm<V, E, M>* algo,
: _state(WorkerState::IDLE),
_config(vocbase, initConfig),
_algorithm(algo),
_nextGSSSendMessageCount(0) {
_nextGSSSendMessageCount(0),
_requestedNextGSS(false) {
MUTEX_LOCKER(guard, _commandMutex);
VPackSlice userParams = initConfig.get(Utils::userParametersKey);

File diff suppressed because it is too large Load Diff

View File

@ -185,14 +185,34 @@ class InitialSyncer : public Syncer {
int handleCollectionSync(arangodb::LogicalCollection*, std::string const&,
std::string const&, TRI_voc_tick_t, std::string&);
//////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
//////////////////////////////////////////////////////////////////////////////
int handleSyncKeysRocksDB(arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg);
//////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch chunk data from a collection
//////////////////////////////////////////////////////////////////////////////
int syncChunkRocksDB(SingleCollectionTransaction* trx,
std::string const& keysId,
uint64_t chunkId,
std::string const& lowKey, std::string const& highKey,
std::vector<std::pair<std::string, uint64_t>> markers,
std::string& errorMsg);
//////////////////////////////////////////////////////////////////////////////
/// @brief incrementally fetch data from a collection
//////////////////////////////////////////////////////////////////////////////
int handleSyncKeys(arangodb::LogicalCollection*, std::string const&,
std::string const&, std::string const&, TRI_voc_tick_t,
std::string&);
int handleSyncKeysMMFiles(arangodb::LogicalCollection* col,
std::string const& keysId, std::string const& cid,
std::string const& collectionName, TRI_voc_tick_t maxTick,
std::string& errorMsg);
//////////////////////////////////////////////////////////////////////////////
/// @brief changes the properties of a collection, based on the VelocyPack

View File

@ -88,7 +88,6 @@ int RocksDBReplicationContext::bindCollection(
if (_collection == nullptr) {
return TRI_ERROR_BAD_PARAMETER;
RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick);
}
_trx->addCollectionAtRuntime(collectionName);
_iter = _collection->getAllIterator(_trx.get(), &_mdr,
@ -127,7 +126,10 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
if (_trx.get() == nullptr) {
return RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick);
}
bindCollection(collectionName);
int res = bindCollection(collectionName);
if (res != TRI_ERROR_NO_ERROR) {
return RocksDBReplicationResult(res, _lastTick);
}
// set type
int type = 2300; // documents
@ -193,7 +195,6 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
VPackSlice highKey; // FIXME: no good keeping this
uint64_t hash = 0x012345678;
// auto cb = [&](DocumentIdentifierToken const& token, StringRef const& key) {
auto cb = [&](DocumentIdentifierToken const& token) {
bool ok = _collection->readDocument(_trx.get(), token, _mdr);
if (!ok) {
@ -218,7 +219,6 @@ arangodb::Result RocksDBReplicationContext::dumpKeyChunks(VPackBuilder& b,
b.openArray();
while (_hasMore && true /*sizelimit*/) {
try {
//_hasMore = primary->nextWithKey(cb, chunkSize);
_hasMore = primary->next(cb, chunkSize);
b.add(VPackValue(VPackValueType::Object));

View File

@ -342,14 +342,13 @@ void RocksDBRestReplicationHandler::handleCommandLoggerState() {
res.errorMessage());
return;
}
rocksdb::SequenceNumber lastTick = db->GetLatestSequenceNumber();
rocksdb::SequenceNumber lastTick = latestSequenceNumber();
// "state" part
builder.add("state", VPackValue(VPackValueType::Object));
builder.add("running", VPackValue(true));
builder.add("lastLogTick", VPackValue(std::to_string(lastTick)));
builder.add("lastLogTick", VPackValue(StringUtils::itoa(lastTick)));
builder.add("lastUncommittedLogTick",
VPackValue(std::to_string(lastTick + 1)));
VPackValue(StringUtils::itoa(lastTick + 1)));
builder.add("totalEvents", VPackValue(0)); // s.numEvents + s.numEventsSync
builder.add("time", VPackValue(utilities::timeString()));
builder.close();

View File

@ -19,3 +19,6 @@ threads = 20
[ssl]
keyfile = @TOP_DIR@/UnitTests/server.pem
[cluster]
system-replication-factor = 1