diff --git a/arangod/Indexes/PrimaryIndex.cpp b/arangod/Indexes/PrimaryIndex.cpp index 8e8f782256..9b4363d5d4 100644 --- a/arangod/Indexes/PrimaryIndex.cpp +++ b/arangod/Indexes/PrimaryIndex.cpp @@ -177,30 +177,8 @@ int PrimaryIndex::remove (TRI_doc_mptr_t const*, /// @brief looks up an element given a key //////////////////////////////////////////////////////////////////////////////// -void* PrimaryIndex::lookupKey (char const* key) const { - if (_primaryIndex._nrUsed == 0) { - return nullptr; - } - - // compute the hash - uint64_t const hash = calculateHash(key); - uint64_t const n = _primaryIndex._nrAlloc; - uint64_t i, k; - - i = k = hash % n; - - TRI_ASSERT_EXPENSIVE(n > 0); - - // search the table - for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentHashElement(key, hash, _primaryIndex._table[i]); ++i); - if (i == n) { - for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentHashElement(key, hash, _primaryIndex._table[i]); ++i); - } - - TRI_ASSERT_EXPENSIVE(i < n); - - // return whatever we found - return _primaryIndex._table[i]; +TRI_doc_mptr_t* PrimaryIndex::lookupKey (char const* key) const { + return _primaryIndex->findByKey(key); } //////////////////////////////////////////////////////////////////////////////// @@ -209,7 +187,7 @@ void* PrimaryIndex::lookupKey (char const* key) const { /// parameter. sets position to UINT64_MAX if the position cannot be determined //////////////////////////////////////////////////////////////////////////////// -void* PrimaryIndex::lookupKey (char const* key, +TRI_doc_mptr_t* PrimaryIndex::lookupKey (char const* key, uint64_t& position) const { if (_primaryIndex._nrUsed == 0) { position = UINT64_MAX; @@ -238,6 +216,33 @@ void* PrimaryIndex::lookupKey (char const* key, return _primaryIndex._table[i]; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief a method to iterate over all elements in the index in +/// a random order. +/// Returns nullptr if all documents have been returned. +/// Convention: step === 0 indicates a new start. +//////////////////////////////////////////////////////////////////////////////// + +TRI_doc_mptr_t* PrimaryIndex::lookupRandom(uint64_t& initialPosition, + uint64_t& position, + uint64_t* step, + uint64_t* total) { + return _primaryIndex->findRandom(initialPosition, position, step, total); +} + +//////////////////////////////////////////////////////////////////////////////// +/// @brief a method to iterate over all elements in the index in +/// a sequential order. +/// Returns nullptr if all documents have been returned. +/// Convention: position === 0 indicates a new start. +//////////////////////////////////////////////////////////////////////////////// + +TRI_doc_mptr_t* PrimaryIndex::findSequential(uint64_t& position, + uint64_t* total) { + return _primaryIndex->findSequential(position, total); +} + + //////////////////////////////////////////////////////////////////////////////// /// @brief adds a key/element to the index /// returns a status code, and *found will contain a found element (if any) @@ -246,42 +251,11 @@ void* PrimaryIndex::lookupKey (char const* key, int PrimaryIndex::insertKey (TRI_doc_mptr_t const* header, void const** found) { *found = nullptr; - - if (shouldResize()) { - // check for out-of-memory - if (! resize(static_cast(2 * _primaryIndex._nrAlloc + 1), false)) { - return TRI_ERROR_OUT_OF_MEMORY; - } + int res = _primaryIndex->insert(TRI_EXTRACT_MARKER_KEY(header), header, false); + if (res === TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED) { + *found = _primaryIndex->find(header); } - - uint64_t const n = _primaryIndex._nrAlloc; - uint64_t i, k; - - TRI_ASSERT_EXPENSIVE(n > 0); - - i = k = header->_hash % n; - - for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i); - if (i == n) { - for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i); - } - - TRI_ASSERT_EXPENSIVE(i < n); - - void* old = _primaryIndex._table[i]; - - // if we found an element, return - if (old != nullptr) { - *found = old; - - return TRI_ERROR_NO_ERROR; - } - - // add a new element to the associative idx - _primaryIndex._table[i] = (void*) header; - ++_primaryIndex._nrUsed; - - return TRI_ERROR_NO_ERROR; + return res; } //////////////////////////////////////////////////////////////////////////////// @@ -291,22 +265,7 @@ int PrimaryIndex::insertKey (TRI_doc_mptr_t const* header, //////////////////////////////////////////////////////////////////////////////// void PrimaryIndex::insertKey (TRI_doc_mptr_t const* header) { - uint64_t const n = _primaryIndex._nrAlloc; - uint64_t i, k; - - i = k = header->_hash % n; - - for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i); - if (i == n) { - for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i); - } - - TRI_ASSERT_EXPENSIVE(i < n); - - TRI_ASSERT_EXPENSIVE(_primaryIndex._table[i] == nullptr); - - _primaryIndex._table[i] = const_cast(static_cast(header)); - ++_primaryIndex._nrUsed; + _primaryIndex->insert(TRI_EXTRACT_MARKER_KEY(header), header, false); } //////////////////////////////////////////////////////////////////////////////// @@ -317,27 +276,16 @@ void PrimaryIndex::insertKey (TRI_doc_mptr_t const* header) { void PrimaryIndex::insertKey (TRI_doc_mptr_t const* header, uint64_t slot) { - uint64_t const n = _primaryIndex._nrAlloc; - uint64_t i, k; - - if (slot != UINT64_MAX) { - i = k = slot; - } - else { - i = k = header->_hash % n; - } - - for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i); - if (i == n) { - for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentKeyElement(header, _primaryIndex._table[i]); ++i); - } - - TRI_ASSERT_EXPENSIVE(i < n); - - TRI_ASSERT_EXPENSIVE(_primaryIndex._table[i] == nullptr); - - _primaryIndex._table[i] = const_cast(static_cast(header)); - ++_primaryIndex._nrUsed; + _primaryIndex->insert(TRI_EXTRACT_MARKER_KEY(header), header, false); + // TODO slot is hint where to insert the element. It is not yet used + // + // if (slot != UINT64_MAX) { + // i = k = slot; + // } + // else { + // i = k = header->_hash % n; + // } + // } //////////////////////////////////////////////////////////////////////////////// @@ -345,51 +293,7 @@ void PrimaryIndex::insertKey (TRI_doc_mptr_t const* header, //////////////////////////////////////////////////////////////////////////////// void* PrimaryIndex::removeKey (char const* key) { - uint64_t const hash = calculateHash(key); - uint64_t const n = _primaryIndex._nrAlloc; - uint64_t i, k; - - i = k = hash % n; - - // search the table - for (; i < n && _primaryIndex._table[i] != nullptr && IsDifferentHashElement(key, hash, _primaryIndex._table[i]); ++i); - if (i == n) { - for (i = 0; i < k && _primaryIndex._table[i] != nullptr && IsDifferentHashElement(key, hash, _primaryIndex._table[i]); ++i); - } - - TRI_ASSERT_EXPENSIVE(i < n); - - // if we did not find such an item return false - if (_primaryIndex._table[i] == nullptr) { - return nullptr; - } - - // remove item - void* old = _primaryIndex._table[i]; - _primaryIndex._table[i] = nullptr; - _primaryIndex._nrUsed--; - - // and now check the following places for items to move here - k = TRI_IncModU64(i, n); - - while (_primaryIndex._table[k] != nullptr) { - uint64_t j = (static_cast(_primaryIndex._table[k])->_hash) % n; - - if ((i < k && ! (i < j && j <= k)) || (k < i && ! (i < j || j <= k))) { - _primaryIndex._table[i] = _primaryIndex._table[k]; - _primaryIndex._table[k] = nullptr; - i = k; - } - - k = TRI_IncModU64(k, n); - } - - if (_primaryIndex._nrUsed == 0) { - resize(InitialSize, true); - } - - // return success - return old; + return _primaryIndex->remove(key); } //////////////////////////////////////////////////////////////////////////////// @@ -397,10 +301,7 @@ void* PrimaryIndex::removeKey (char const* key) { //////////////////////////////////////////////////////////////////////////////// int PrimaryIndex::resize (size_t targetSize) { - if (! resize(static_cast(2 * targetSize + 1), false)) { - return TRI_ERROR_OUT_OF_MEMORY; - } - return TRI_ERROR_NO_ERROR; + return _primaryIndex->resize(targetSize); } //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/Indexes/PrimaryIndex.h b/arangod/Indexes/PrimaryIndex.h index c3cb7252e5..b877a9d77a 100644 --- a/arangod/Indexes/PrimaryIndex.h +++ b/arangod/Indexes/PrimaryIndex.h @@ -31,6 +31,7 @@ #define ARANGODB_INDEXES_PRIMARY_INDEX_H 1 #include "Basics/Common.h" +#include "Basics/AssocUnique.h" #include "Indexes/Index.h" #include "VocBase/vocbase.h" #include "VocBase/voc-types.h" @@ -92,11 +93,11 @@ namespace triagens { triagens::basics::Json toJson (TRI_memory_zone_t*, bool) const override final; triagens::basics::Json toJsonFigures (TRI_memory_zone_t*) const override final; - int insert (struct TRI_doc_mptr_t const*, bool) override final; - - int remove (struct TRI_doc_mptr_t const*, bool) override final; + int insert (TRI_doc_mptr_t const*, bool) override final; - void* lookupKey (char const*) const; + int remove (TRI_doc_mptr_t const*, bool) override final; + + TRI_doc_mptr_t* lookupKey (char const*) const; //////////////////////////////////////////////////////////////////////////////// /// @brief looks up an element given a key @@ -104,10 +105,32 @@ namespace triagens { /// parameter. sets position to UINT64_MAX if the position cannot be determined //////////////////////////////////////////////////////////////////////////////// - void* lookupKey (char const*, uint64_t&) const; + TRI_doc_mptr_t* lookupKey (char const*, uint64_t&) const; - int insertKey (struct TRI_doc_mptr_t const*, void const**); - void insertKey (struct TRI_doc_mptr_t const*); +//////////////////////////////////////////////////////////////////////////////// +/// @brief a method to iterate over all elements in the index in +/// a random order. +/// Returns nullptr if all documents have been returned. +/// Convention: step === 0 indicates a new start. +//////////////////////////////////////////////////////////////////////////////// + + TRI_doc_mptr_t* lookupRandom(uint64_t& initialPosition, + uint64_t& position, + uint64_t* step, + uint64_t* total); + +//////////////////////////////////////////////////////////////////////////////// +/// @brief a method to iterate over all elements in the index in +/// a sequential order. +/// Returns nullptr if all documents have been returned. +/// Convention: position === 0 indicates a new start. +//////////////////////////////////////////////////////////////////////////////// + + TRI_doc_mptr_t* findSequential(uint64_t& position, + uint64_t* total); + + int insertKey (TRI_doc_mptr_t const*, void const**); + void insertKey (TRI_doc_mptr_t const*); //////////////////////////////////////////////////////////////////////////////// /// @brief adds a key/element to the index @@ -117,7 +140,7 @@ namespace triagens { void insertKey (struct TRI_doc_mptr_t const*, uint64_t); - void* removeKey (char const*); + TRI_doc_mptr_t* removeKey (char const*); int resize (size_t); int resize (); diff --git a/arangod/Utils/Transaction.h b/arangod/Utils/Transaction.h index 37aa0a05b8..27f4a15eac 100644 --- a/arangod/Utils/Transaction.h +++ b/arangod/Utils/Transaction.h @@ -488,68 +488,30 @@ namespace triagens { uint64_t batchSize, uint64_t* step, uint64_t* total) { - if (initialPosition > 0 && position == initialPosition) { - // already read all documents - return TRI_ERROR_NO_ERROR; - } - TRI_document_collection_t* document = documentCollection(trxCollection); - // READ-LOCK START int res = this->lock(trxCollection, TRI_TRANSACTION_READ); if (res != TRI_ERROR_NO_ERROR) { return res; } - - auto primaryIndex = document->primaryIndex()->internals(); - if (primaryIndex->_nrUsed == 0) { - // nothing to do - this->unlock(trxCollection, TRI_TRANSACTION_READ); - - // READ-LOCK END - *total = 0; - return TRI_ERROR_NO_ERROR; - } - if (orderDitch(trxCollection) == nullptr) { return TRI_ERROR_OUT_OF_MEMORY; } - *total = primaryIndex->_nrAlloc; - if (*step == 0) { - TRI_ASSERT(initialPosition == 0); - - // find a co-prime for total - while (true) { - *step = TRI_UInt32Random() % *total; - if (*step > 10 && triagens::basics::binaryGcd(*total, *step) == 1) { - while (initialPosition == 0) { - initialPosition = TRI_UInt32Random() % *total; - } - position = initialPosition; - break; - } - } - } - uint64_t numRead = 0; - do { - auto d = static_cast(primaryIndex->_table[position]); - - if (d != nullptr) { - docs.emplace_back(*d); - ++numRead; + TRI_ASSERT(batchSize > 0); + while (numRead < batchSize) { + auto d = document->primaryIndex()->lookupRandom(initialPosition, position, step, total); + if (d == nullptr) { + // Read all documents randomly + break; } - - position += *step; - position = position % *total; + docs.emplace_back(*d); + ++numRead; } - while (numRead < batchSize && position != initialPosition); - this->unlock(trxCollection, TRI_TRANSACTION_READ); // READ-LOCK END - return TRI_ERROR_NO_ERROR; }