1
0
Fork 0

Piped through readRandom and readSequential in PrimaryIndex. Used in Transaction

This commit is contained in:
Michael Hackstein 2015-08-27 11:54:51 +02:00
parent 48240bccc1
commit 0e90f396d7
3 changed files with 86 additions and 200 deletions

View File

@ -177,30 +177,8 @@ int PrimaryIndex::remove (TRI_doc_mptr_t const*,
/// @brief looks up an element given a key
////////////////////////////////////////////////////////////////////////////////
void* PrimaryIndex::lookupKey (char const* key) const {
if (_primaryIndex._nrUsed == 0) {
return nullptr;
}
// compute the hash
uint64_t const hash = calculateHash(key);
uint64_t const n = _primaryIndex._nrAlloc;
uint64_t i, k;
i = k = hash % n;
TRI_ASSERT_EXPENSIVE(n > 0);
// search the table
for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentHashElement(key, hash, _primaryIndex._table[i]); ++i);
if (i == n) {
for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentHashElement(key, hash, _primaryIndex._table[i]); ++i);
}
TRI_ASSERT_EXPENSIVE(i < n);
// return whatever we found
return _primaryIndex._table[i];
TRI_doc_mptr_t* PrimaryIndex::lookupKey (char const* key) const {
return _primaryIndex->findByKey(key);
}
////////////////////////////////////////////////////////////////////////////////
@ -209,7 +187,7 @@ void* PrimaryIndex::lookupKey (char const* key) const {
/// parameter. sets position to UINT64_MAX if the position cannot be determined
////////////////////////////////////////////////////////////////////////////////
void* PrimaryIndex::lookupKey (char const* key,
TRI_doc_mptr_t* PrimaryIndex::lookupKey (char const* key,
uint64_t& position) const {
if (_primaryIndex._nrUsed == 0) {
position = UINT64_MAX;
@ -238,6 +216,33 @@ void* PrimaryIndex::lookupKey (char const* key,
return _primaryIndex._table[i];
}
////////////////////////////////////////////////////////////////////////////////
/// @brief a method to iterate over all elements in the index in
/// a random order.
/// Returns nullptr if all documents have been returned.
/// Convention: step === 0 indicates a new start.
////////////////////////////////////////////////////////////////////////////////
TRI_doc_mptr_t* PrimaryIndex::lookupRandom(uint64_t& initialPosition,
uint64_t& position,
uint64_t* step,
uint64_t* total) {
return _primaryIndex->findRandom(initialPosition, position, step, total);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief a method to iterate over all elements in the index in
/// a sequential order.
/// Returns nullptr if all documents have been returned.
/// Convention: position === 0 indicates a new start.
////////////////////////////////////////////////////////////////////////////////
TRI_doc_mptr_t* PrimaryIndex::findSequential(uint64_t& position,
uint64_t* total) {
return _primaryIndex->findSequential(position, total);
}
////////////////////////////////////////////////////////////////////////////////
/// @brief adds a key/element to the index
/// returns a status code, and *found will contain a found element (if any)
@ -246,42 +251,11 @@ void* PrimaryIndex::lookupKey (char const* key,
int PrimaryIndex::insertKey (TRI_doc_mptr_t const* header,
void const** found) {
*found = nullptr;
if (shouldResize()) {
// check for out-of-memory
if (! resize(static_cast<uint64_t>(2 * _primaryIndex._nrAlloc + 1), false)) {
return TRI_ERROR_OUT_OF_MEMORY;
}
int res = _primaryIndex->insert(TRI_EXTRACT_MARKER_KEY(header), header, false);
if (res === TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) {
*found = _primaryIndex->find(header);
}
uint64_t const n = _primaryIndex._nrAlloc;
uint64_t i, k;
TRI_ASSERT_EXPENSIVE(n > 0);
i = k = header->_hash % n;
for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i);
if (i == n) {
for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i);
}
TRI_ASSERT_EXPENSIVE(i < n);
void* old = _primaryIndex._table[i];
// if we found an element, return
if (old != nullptr) {
*found = old;
return TRI_ERROR_NO_ERROR;
}
// add a new element to the associative idx
_primaryIndex._table[i] = (void*) header;
++_primaryIndex._nrUsed;
return TRI_ERROR_NO_ERROR;
return res;
}
////////////////////////////////////////////////////////////////////////////////
@ -291,22 +265,7 @@ int PrimaryIndex::insertKey (TRI_doc_mptr_t const* header,
////////////////////////////////////////////////////////////////////////////////
void PrimaryIndex::insertKey (TRI_doc_mptr_t const* header) {
uint64_t const n = _primaryIndex._nrAlloc;
uint64_t i, k;
i = k = header->_hash % n;
for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i);
if (i == n) {
for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i);
}
TRI_ASSERT_EXPENSIVE(i < n);
TRI_ASSERT_EXPENSIVE(_primaryIndex._table[i] == nullptr);
_primaryIndex._table[i] = const_cast<void*>(static_cast<void const*>(header));
++_primaryIndex._nrUsed;
_primaryIndex->insert(TRI_EXTRACT_MARKER_KEY(header), header, false);
}
////////////////////////////////////////////////////////////////////////////////
@ -317,27 +276,16 @@ void PrimaryIndex::insertKey (TRI_doc_mptr_t const* header) {
void PrimaryIndex::insertKey (TRI_doc_mptr_t const* header,
uint64_t slot) {
uint64_t const n = _primaryIndex._nrAlloc;
uint64_t i, k;
if (slot != UINT64_MAX) {
i = k = slot;
}
else {
i = k = header->_hash % n;
}
for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i);
if (i == n) {
for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i);
}
TRI_ASSERT_EXPENSIVE(i < n);
TRI_ASSERT_EXPENSIVE(_primaryIndex._table[i] == nullptr);
_primaryIndex._table[i] = const_cast<void*>(static_cast<void const*>(header));
++_primaryIndex._nrUsed;
_primaryIndex->insert(TRI_EXTRACT_MARKER_KEY(header), header, false);
// TODO slot is hint where to insert the element. It is not yet used
//
// if (slot != UINT64_MAX) {
// i = k = slot;
// }
// else {
// i = k = header->_hash % n;
// }
//
}
////////////////////////////////////////////////////////////////////////////////
@ -345,51 +293,7 @@ void PrimaryIndex::insertKey (TRI_doc_mptr_t const* header,
////////////////////////////////////////////////////////////////////////////////
void* PrimaryIndex::removeKey (char const* key) {
uint64_t const hash = calculateHash(key);
uint64_t const n = _primaryIndex._nrAlloc;
uint64_t i, k;
i = k = hash % n;
// search the table
for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentHashElement(key, hash, _primaryIndex._table[i]); ++i);
if (i == n) {
for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentHashElement(key, hash, _primaryIndex._table[i]); ++i);
}
TRI_ASSERT_EXPENSIVE(i < n);
// if we did not find such an item return false
if (_primaryIndex._table[i] == nullptr) {
return nullptr;
}
// remove item
void* old = _primaryIndex._table[i];
_primaryIndex._table[i] = nullptr;
_primaryIndex._nrUsed--;
// and now check the following places for items to move here
k = TRI_IncModU64(i, n);
while (_primaryIndex._table[k] != nullptr) {
uint64_t j = (static_cast<TRI_doc_mptr_t const*>(_primaryIndex._table[k])->_hash) % n;
if ((i < k && ! (i < j && j <= k)) || (k < i && ! (i < j || j <= k))) {
_primaryIndex._table[i] = _primaryIndex._table[k];
_primaryIndex._table[k] = nullptr;
i = k;
}
k = TRI_IncModU64(k, n);
}
if (_primaryIndex._nrUsed == 0) {
resize(InitialSize, true);
}
// return success
return old;
return _primaryIndex->remove(key);
}
////////////////////////////////////////////////////////////////////////////////
@ -397,10 +301,7 @@ void* PrimaryIndex::removeKey (char const* key) {
////////////////////////////////////////////////////////////////////////////////
int PrimaryIndex::resize (size_t targetSize) {
if (! resize(static_cast<uint64_t>(2 * targetSize + 1), false)) {
return TRI_ERROR_OUT_OF_MEMORY;
}
return TRI_ERROR_NO_ERROR;
return _primaryIndex->resize(targetSize);
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -31,6 +31,7 @@
#define ARANGODB_INDEXES_PRIMARY_INDEX_H 1
#include "Basics/Common.h"
#include "Basics/AssocUnique.h"
#include "Indexes/Index.h"
#include "VocBase/vocbase.h"
#include "VocBase/voc-types.h"
@ -92,11 +93,11 @@ namespace triagens {
triagens::basics::Json toJson (TRI_memory_zone_t*, bool) const override final;
triagens::basics::Json toJsonFigures (TRI_memory_zone_t*) const override final;
int insert (struct TRI_doc_mptr_t const*, bool) override final;
int remove (struct TRI_doc_mptr_t const*, bool) override final;
int insert (TRI_doc_mptr_t const*, bool) override final;
void* lookupKey (char const*) const;
int remove (TRI_doc_mptr_t const*, bool) override final;
TRI_doc_mptr_t* lookupKey (char const*) const;
////////////////////////////////////////////////////////////////////////////////
/// @brief looks up an element given a key
@ -104,10 +105,32 @@ namespace triagens {
/// parameter. sets position to UINT64_MAX if the position cannot be determined
////////////////////////////////////////////////////////////////////////////////
void* lookupKey (char const*, uint64_t&) const;
TRI_doc_mptr_t* lookupKey (char const*, uint64_t&) const;
int insertKey (struct TRI_doc_mptr_t const*, void const**);
void insertKey (struct TRI_doc_mptr_t const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief a method to iterate over all elements in the index in
/// a random order.
/// Returns nullptr if all documents have been returned.
/// Convention: step === 0 indicates a new start.
////////////////////////////////////////////////////////////////////////////////
TRI_doc_mptr_t* lookupRandom(uint64_t& initialPosition,
uint64_t& position,
uint64_t* step,
uint64_t* total);
////////////////////////////////////////////////////////////////////////////////
/// @brief a method to iterate over all elements in the index in
/// a sequential order.
/// Returns nullptr if all documents have been returned.
/// Convention: position === 0 indicates a new start.
////////////////////////////////////////////////////////////////////////////////
TRI_doc_mptr_t* findSequential(uint64_t& position,
uint64_t* total);
int insertKey (TRI_doc_mptr_t const*, void const**);
void insertKey (TRI_doc_mptr_t const*);
////////////////////////////////////////////////////////////////////////////////
/// @brief adds a key/element to the index
@ -117,7 +140,7 @@ namespace triagens {
void insertKey (struct TRI_doc_mptr_t const*, uint64_t);
void* removeKey (char const*);
TRI_doc_mptr_t* removeKey (char const*);
int resize (size_t);
int resize ();

View File

@ -488,68 +488,30 @@ namespace triagens {
uint64_t batchSize,
uint64_t* step,
uint64_t* total) {
if (initialPosition > 0 && position == initialPosition) {
// already read all documents
return TRI_ERROR_NO_ERROR;
}
TRI_document_collection_t* document = documentCollection(trxCollection);
// READ-LOCK START
int res = this->lock(trxCollection, TRI_TRANSACTION_READ);
if (res != TRI_ERROR_NO_ERROR) {
return res;
}
auto primaryIndex = document->primaryIndex()->internals();
if (primaryIndex->_nrUsed == 0) {
// nothing to do
this->unlock(trxCollection, TRI_TRANSACTION_READ);
// READ-LOCK END
*total = 0;
return TRI_ERROR_NO_ERROR;
}
if (orderDitch(trxCollection) == nullptr) {
return TRI_ERROR_OUT_OF_MEMORY;
}
*total = primaryIndex->_nrAlloc;
if (*step == 0) {
TRI_ASSERT(initialPosition == 0);
// find a co-prime for total
while (true) {
*step = TRI_UInt32Random() % *total;
if (*step > 10 && triagens::basics::binaryGcd<uint64_t>(*total, *step) == 1) {
while (initialPosition == 0) {
initialPosition = TRI_UInt32Random() % *total;
}
position = initialPosition;
break;
}
}
}
uint64_t numRead = 0;
do {
auto d = static_cast<TRI_doc_mptr_t*>(primaryIndex->_table[position]);
if (d != nullptr) {
docs.emplace_back(*d);
++numRead;
TRI_ASSERT(batchSize > 0);
while (numRead < batchSize) {
auto d = document->primaryIndex()->lookupRandom(initialPosition, position, step, total);
if (d == nullptr) {
// Read all documents randomly
break;
}
position += *step;
position = position % *total;
docs.emplace_back(*d);
++numRead;
}
while (numRead < batchSize && position != initialPosition);
this->unlock(trxCollection, TRI_TRANSACTION_READ);
// READ-LOCK END
return TRI_ERROR_NO_ERROR;
}