1
0
Fork 0

Merge branch 'engine-api' of https://github.com/arangodb/arangodb into engine-api

This commit is contained in:
jsteemann 2017-04-05 13:40:25 +02:00
commit 5880daf2a5
15 changed files with 50 additions and 22 deletions

View File

@ -2228,7 +2228,8 @@ std::unique_ptr<IndexIterator> MMFilesCollection::getAnyIterator(transaction::Me
return std::unique_ptr<IndexIterator>(primaryIndex()->anyIterator(trx, mdr));
}
void MMFilesCollection::invokeOnAllElements(std::function<bool(DocumentIdentifierToken const&)> callback){
void MMFilesCollection::invokeOnAllElements(transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback){
primaryIndex()->invokeOnAllElements(callback);
}

View File

@ -273,7 +273,8 @@ class MMFilesCollection final : public PhysicalCollection {
std::unique_ptr<IndexIterator> getAllIterator(transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse) override;
std::unique_ptr<IndexIterator> getAnyIterator(transaction::Methods* trx, ManagedDocumentResult* mdr) override;
void invokeOnAllElements(std::function<bool(DocumentIdentifierToken const&)> callback) override;
void invokeOnAllElements(transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) override;
std::shared_ptr<Index> createIndex(transaction::Methods* trx,
arangodb::velocypack::Slice const& info,

View File

@ -86,7 +86,7 @@ static std::string const RESTART_COUNTER_AGG = "aggRestart";
/** Maximum steps for the random walk, corresponds to t*. Default = 1000 */
static uint64_t const RW_ITERATIONBOUND = 10;
static const double PROFTIABILITY_DELTA = 0.3;
static const float PROFTIABILITY_DELTA = 0.3;
static const bool LOG_AGGS = false;
@ -314,7 +314,8 @@ struct DMIDComputation
float senderWeight = message->weight;
float myInfluence = senderWeight * vecLS->getAggregatedValue(this->shard(), this->key());
float myInfluence = (float)vecLS->getAggregatedValue(this->shard(), this->key());
myInfluence *= senderWeight;
/**
* hasEdgeToSender determines if sender has influence on this vertex
@ -328,7 +329,9 @@ struct DMIDComputation
* Has this vertex more influence on the sender than the
* sender on this vertex?
*/
float senderInfluence = *(edge->data()) * vecLS->getAggregatedValue(senderID.shard, senderID.key);
float senderInfluence = (float)vecLS->getAggregatedValue(senderID.shard, senderID.key);
senderInfluence *= *(edge->data());
if (myInfluence > senderInfluence) {
/** send new message */
DMIDMessage message(pregelId(), myInfluence);
@ -517,7 +520,8 @@ struct DMIDComputation
// Map.Entry<Long, Double> entry : membershipCounter.entrySet()
for (std::pair<PregelID, float> const& pair : membershipCounter) {
if ((pair.second / getEdges().size()) > *threshold) {
float const ttt = pair.second / getEdges().size();
if (ttt > *threshold) {
/** its profitable to become a member, set value */
vertexState->membershipDegree[pair.first] = 1.0 / std::pow(*iterationCounter / 3, 2);
aggregate<bool>(NEW_MEMBER_AGG, true);

View File

@ -38,7 +38,7 @@ struct DMIDMessageFormat : public MessageFormat<DMIDMessage> {
DMIDMessageFormat() {}
void unwrapValue(VPackSlice s, DMIDMessage& message) const override {
VPackArrayIterator array(s);
message.senderId.shard = (*array).getUInt();
message.senderId.shard = (PregelShard) ((*array).getUInt());
message.senderId.key = (*(++array)).copyString();
message.leaderId.shard = (PregelShard) (*array).getUInt();
message.leaderId.key = (*(++array)).copyString();

View File

@ -51,12 +51,12 @@ struct ECComputation : public VertexComputation<ECValue, int8_t, HLLCounter> {
value->counter.addNode(pregelId());
}
int32_t seenCountBefore = value->counter.getCount();
uint32_t seenCountBefore = value->counter.getCount();
for (HLLCounter const* inCounter : messages) {
value->counter.merge(*inCounter);
}
int32_t seenCountAfter = value->counter.getCount();
uint32_t seenCountAfter = value->counter.getCount();
if ((seenCountBefore != seenCountAfter) || (globalSuperstep() == 0)) {
sendMessageToAllEdges(value->counter);
}
@ -67,7 +67,7 @@ struct ECComputation : public VertexComputation<ECValue, int8_t, HLLCounter> {
// when the compute method is not invoked
if (value->shortestPaths.size() < globalSuperstep()) {
size_t i = value->shortestPaths.size();
int32_t numReachable = value->shortestPaths.back();
uint32_t numReachable = value->shortestPaths.back();
for (; i < globalSuperstep(); i++) {
value->shortestPaths.push_back(numReachable);
}
@ -107,10 +107,10 @@ struct ECGraphFormat : public GraphFormat<ECValue, int8_t> {
bool buildVertexDocument(arangodb::velocypack::Builder& b, const ECValue* ptr,
size_t size) const override {
int32_t numVerticesReachable = 0;
int32_t sumLengths = 0;
size_t numVerticesReachable = 0;
size_t sumLengths = 0;
for (size_t i = 1; i < ptr->shortestPaths.size(); i++) {
int32_t newlyReachable =
uint32_t newlyReachable =
ptr->shortestPaths[i] - ptr->shortestPaths[i - 1];
sumLengths += i * newlyReachable;
if (ptr->shortestPaths[i] > numVerticesReachable) {

View File

@ -90,7 +90,7 @@ struct HLLCounter {
/// Effective closeness value
struct ECValue {
HLLCounter counter;
std::vector<int32_t> shortestPaths;
std::vector<uint32_t> shortestPaths;
};
struct SCCValue {

View File

@ -385,9 +385,9 @@ std::unique_ptr<IndexIterator> RocksDBCollection::getAnyIterator(
primaryIndex()->anyIterator(trx, mdr));
}
void RocksDBCollection::invokeOnAllElements(
void RocksDBCollection::invokeOnAllElements(transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
primaryIndex()->invokeOnAllElements(trx, callback);
}
////////////////////////////////////

View File

@ -118,7 +118,7 @@ class RocksDBCollection final : public PhysicalCollection {
std::unique_ptr<IndexIterator> getAnyIterator(
transaction::Methods* trx, ManagedDocumentResult* mdr) override;
void invokeOnAllElements(
void invokeOnAllElements(transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) override;
////////////////////////////////////

View File

@ -640,6 +640,7 @@ void RocksDBEngine::signalCleanup(TRI_vocbase_t*) {
void RocksDBEngine::iterateDocuments(
TRI_voc_tick_t databaseId, TRI_voc_cid_t collectionId,
std::function<void(arangodb::velocypack::Slice const&)> const& cb) {
THROW_ARANGO_NOT_YET_IMPLEMENTED();
}

View File

@ -479,6 +479,22 @@ IndexIterator* RocksDBPrimaryIndex::anyIterator(transaction::Methods* trx,
return new RocksDBAnyIndexIterator(_collection, trx, mmdr, this);
}
void RocksDBPrimaryIndex::invokeOnAllElements(transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) {
ManagedDocumentResult mmdr;
std::unique_ptr<IndexIterator> cursor (allIterator(trx, &mmdr, false));
bool cnt = true;
auto cb = [&](DocumentIdentifierToken token) {
if (cnt) {
cnt = callback(token);
}
};
while (cursor->next(cb, 1000) && cnt) {
}
}
/// @brief create the iterator, for a single attribute, IN operator
IndexIterator* RocksDBPrimaryIndex::createInIterator(
transaction::Methods* trx, ManagedDocumentResult* mmdr,

View File

@ -192,6 +192,9 @@ class RocksDBPrimaryIndex final : public RocksDBIndex {
IndexIterator* anyIterator(transaction::Methods* trx,
ManagedDocumentResult* mmdr) const;
void invokeOnAllElements(transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback);
private:
/// @brief create the iterator, for a single attribute, IN operator
IndexIterator* createInIterator(transaction::Methods*, ManagedDocumentResult*,

View File

@ -931,7 +931,7 @@ void transaction::Methods::invokeOnAllElements(std::string const& collectionName
THROW_ARANGO_EXCEPTION(res);
}
logical->invokeOnAllElements(callback);
logical->invokeOnAllElements(this, callback);
res = unlock(trxCol, AccessMode::Type::READ);

View File

@ -460,8 +460,9 @@ std::unique_ptr<IndexIterator> LogicalCollection::getAnyIterator(transaction::Me
return _physical->getAnyIterator(trx, mdr);
}
void LogicalCollection::invokeOnAllElements(std::function<bool(DocumentIdentifierToken const&)> callback){
_physical->invokeOnAllElements(callback);
void LogicalCollection::invokeOnAllElements(transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback){
_physical->invokeOnAllElements(trx, callback);
}

View File

@ -169,7 +169,7 @@ class LogicalCollection {
std::unique_ptr<IndexIterator> getAllIterator(transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse);
std::unique_ptr<IndexIterator> getAnyIterator(transaction::Methods* trx, ManagedDocumentResult* mdr);
void invokeOnAllElements(std::function<bool(DocumentIdentifierToken const&)> callback);
void invokeOnAllElements(transaction::Methods* trx, std::function<bool(DocumentIdentifierToken const&)> callback);
// SECTION: Indexes

View File

@ -118,7 +118,8 @@ class PhysicalCollection {
virtual std::unique_ptr<IndexIterator> getAllIterator(transaction::Methods* trx, ManagedDocumentResult* mdr, bool reverse) = 0;
virtual std::unique_ptr<IndexIterator> getAnyIterator(transaction::Methods* trx, ManagedDocumentResult* mdr) = 0;
virtual void invokeOnAllElements(std::function<bool(DocumentIdentifierToken const&)> callback) = 0;
virtual void invokeOnAllElements(transaction::Methods* trx,
std::function<bool(DocumentIdentifierToken const&)> callback) = 0;
////////////////////////////////////
// -- SECTION DML Operations --