From f52beeacb021c1fa1fbe6b058ce89a00cfe4350f Mon Sep 17 00:00:00 2001 From: Claudius Weinberger Date: Tue, 1 Sep 2015 12:38:51 +0000 Subject: [PATCH] fixed invalid index lookupus --- arangod/Indexes/EdgeIndex.cpp | 18 +++++++ arangod/Indexes/PrimaryIndex.cpp | 15 +++--- arangod/Utils/CollectionExport.cpp | 3 ++ arangod/Utils/Transaction.h | 66 +++++++++++++++---------- arangod/VocBase/document-collection.cpp | 27 ++++++---- 5 files changed, 85 insertions(+), 44 deletions(-) diff --git a/arangod/Indexes/EdgeIndex.cpp b/arangod/Indexes/EdgeIndex.cpp index 8b59881d04..404b5ff844 100644 --- a/arangod/Indexes/EdgeIndex.cpp +++ b/arangod/Indexes/EdgeIndex.cpp @@ -46,6 +46,8 @@ using namespace triagens::arango; //////////////////////////////////////////////////////////////////////////////// static uint64_t HashElementKey (void const* data) { + TRI_ASSERT_EXPENSIVE(data != nullptr); + TRI_edge_header_t const* h = static_cast(data); char const* key = h->_key; @@ -61,6 +63,8 @@ static uint64_t HashElementKey (void const* data) { static uint64_t HashElementEdgeFrom (void const* data, bool byKey) { + TRI_ASSERT_EXPENSIVE(data != nullptr); + uint64_t hash; if (! byKey) { @@ -99,6 +103,8 @@ static uint64_t HashElementEdgeFrom (void const* data, static uint64_t HashElementEdgeTo (void const* data, bool byKey) { + TRI_ASSERT_EXPENSIVE(data != nullptr); + uint64_t hash; if (! byKey) { @@ -137,6 +143,9 @@ static uint64_t HashElementEdgeTo (void const* data, static bool IsEqualKeyEdgeFrom (void const* left, void const* right) { + TRI_ASSERT_EXPENSIVE(left != nullptr); + TRI_ASSERT_EXPENSIVE(right != nullptr); + // left is a key // right is an element, that is a master pointer TRI_edge_header_t const* l = static_cast(left); @@ -170,6 +179,9 @@ static bool IsEqualKeyEdgeFrom (void const* left, static bool IsEqualKeyEdgeTo (void const* left, void const* right) { + TRI_ASSERT_EXPENSIVE(left != nullptr); + TRI_ASSERT_EXPENSIVE(right != nullptr); + // left is a key // right is an element, that is a master pointer TRI_edge_header_t const* l = static_cast(left); @@ -213,6 +225,9 @@ static bool IsEqualElementEdge (void const* left, static bool IsEqualElementEdgeFromByKey (void const* left, void const* right) { + TRI_ASSERT_EXPENSIVE(left != nullptr); + TRI_ASSERT_EXPENSIVE(right != nullptr); + char const* lKey = nullptr; char const* rKey = nullptr; TRI_voc_cid_t lCid = 0; @@ -264,6 +279,9 @@ static bool IsEqualElementEdgeFromByKey (void const* left, static bool IsEqualElementEdgeToByKey (void const* left, void const* right) { + TRI_ASSERT_EXPENSIVE(left != nullptr); + TRI_ASSERT_EXPENSIVE(right != nullptr); + char const* lKey = nullptr; char const* rKey = nullptr; TRI_voc_cid_t lCid = 0; diff --git a/arangod/Indexes/PrimaryIndex.cpp b/arangod/Indexes/PrimaryIndex.cpp index 5be47f635c..6283341e74 100644 --- a/arangod/Indexes/PrimaryIndex.cpp +++ b/arangod/Indexes/PrimaryIndex.cpp @@ -179,10 +179,10 @@ TRI_doc_mptr_t* PrimaryIndex::lookupKey (char const* key, /// Convention: step === 0 indicates a new start. //////////////////////////////////////////////////////////////////////////////// -TRI_doc_mptr_t* PrimaryIndex::lookupRandom(uint64_t& initialPosition, - uint64_t& position, - uint64_t& step, - uint64_t& total) { +TRI_doc_mptr_t* PrimaryIndex::lookupRandom (uint64_t& initialPosition, + uint64_t& position, + uint64_t& step, + uint64_t& total) { return _primaryIndex->findRandom(initialPosition, position, step, total); } @@ -193,8 +193,8 @@ TRI_doc_mptr_t* PrimaryIndex::lookupRandom(uint64_t& initialPosition, /// Convention: position === 0 indicates a new start. //////////////////////////////////////////////////////////////////////////////// -TRI_doc_mptr_t* PrimaryIndex::lookupSequential(uint64_t& position, - uint64_t& total) { +TRI_doc_mptr_t* PrimaryIndex::lookupSequential (uint64_t& position, + uint64_t& total) { return _primaryIndex->findSequential(position, total); } @@ -205,11 +205,10 @@ TRI_doc_mptr_t* PrimaryIndex::lookupSequential(uint64_t& position, /// Convention: position === UINT64_MAX indicates a new start. //////////////////////////////////////////////////////////////////////////////// -TRI_doc_mptr_t* PrimaryIndex::lookupSequentialReverse(uint64_t& position) { +TRI_doc_mptr_t* PrimaryIndex::lookupSequentialReverse (uint64_t& position) { return _primaryIndex->findSequentialReverse(position); } - //////////////////////////////////////////////////////////////////////////////// /// @brief adds a key/element to the index /// returns a status code, and *found will contain a found element (if any) diff --git a/arangod/Utils/CollectionExport.cpp b/arangod/Utils/CollectionExport.cpp index fb7f340de7..fc245c453b 100644 --- a/arangod/Utils/CollectionExport.cpp +++ b/arangod/Utils/CollectionExport.cpp @@ -138,10 +138,13 @@ void CollectionExport::run (uint64_t maxWaitTime, size_t limit) { uint64_t total = 0; while (limit > 0) { auto ptr = idx->lookupSequential(position, total); + if (ptr == nullptr) { break; } + void const* marker = ptr->getDataPtr(); + if (! TRI_IsWalDataMarkerDatafile(marker)) { _documents->emplace_back(marker); --limit; diff --git a/arangod/Utils/Transaction.h b/arangod/Utils/Transaction.h index 40aa8faf16..51f65884c7 100644 --- a/arangod/Utils/Transaction.h +++ b/arangod/Utils/Transaction.h @@ -427,17 +427,17 @@ namespace triagens { docs.reserve(batchSize); } TRI_ASSERT(batchSize > 0); - TRI_doc_mptr_t const* ptr = nullptr; + while (count < batchSize) { - ptr = document->primaryIndex()->lookupSequential(internalSkip, total); - if (ptr == nullptr) { + TRI_doc_mptr_t const* mptr = document->primaryIndex()->lookupSequential(internalSkip, total); + if (mptr == nullptr) { break; } if (skip > 0) { --skip; } else { - docs.emplace_back(*ptr); + docs.emplace_back(*mptr); if (++count >= limit) { break; } @@ -481,13 +481,14 @@ namespace triagens { uint64_t numRead = 0; TRI_ASSERT(batchSize > 0); + while (numRead < batchSize) { - auto d = document->primaryIndex()->lookupRandom(initialPosition, position, step, total); - if (d == nullptr) { + auto mptr = document->primaryIndex()->lookupRandom(initialPosition, position, step, total); + if (mptr == nullptr) { // Read all documents randomly break; } - docs.emplace_back(*d); + docs.emplace_back(*mptr); ++numRead; } this->unlock(trxCollection, TRI_TRANSACTION_READ); @@ -815,6 +816,7 @@ namespace triagens { uint64_t pos = 0; uint64_t step = 0; uint64_t total = 0; + TRI_doc_mptr_t* found = idx->lookupRandom(intPos, pos, step, total); if (found != nullptr) { *mptr = *found; @@ -847,16 +849,18 @@ namespace triagens { } auto idx = document->primaryIndex(); size_t used = idx->size(); + if (used > 0) { uint64_t step = 0; uint64_t total = 0; - TRI_doc_mptr_t const* d = nullptr; + while (true) { - d = idx->lookupSequential(step, total); - if (d == nullptr) { + TRI_doc_mptr_t const* mptr = idx->lookupSequential(step, total); + + if (mptr == nullptr) { break; } - ids.emplace_back(TRI_EXTRACT_MARKER_KEY(d)); + ids.emplace_back(TRI_EXTRACT_MARKER_KEY(mptr)); } } @@ -962,27 +966,30 @@ namespace triagens { uint64_t count = 0; auto idx = document->primaryIndex(); - TRI_doc_mptr_t const* ptr = nullptr; + TRI_doc_mptr_t const* mptr = nullptr; if (skip < 0) { uint64_t position = UINT64_MAX; do { - ptr = idx->lookupSequentialReverse(position); + mptr = idx->lookupSequentialReverse(position); ++skip; } - while (skip < 0 && ptr != nullptr); - if (ptr == nullptr) { + while (skip < 0 && mptr != nullptr); + + if (mptr == nullptr) { this->unlock(trxCollection, TRI_TRANSACTION_READ); // To few elements, skipped all return TRI_ERROR_NO_ERROR; } + do { - ptr = idx->lookupSequentialReverse(position); - if (ptr == nullptr) { + mptr = idx->lookupSequentialReverse(position); + + if (mptr == nullptr) { break; } ++count; - docs.emplace_back(*ptr); + docs.emplace_back(*mptr); } while (count < limit); @@ -992,23 +999,25 @@ namespace triagens { uint64_t position = 0; while (skip > 0) { - ptr = idx->lookupSequential(position, total); + mptr = idx->lookupSequential(position, total); --skip; - if (ptr == nullptr) { + if (mptr == nullptr) { // To few elements, skipped all this->unlock(trxCollection, TRI_TRANSACTION_READ); return TRI_ERROR_NO_ERROR; } } + do { - ptr = idx->lookupSequential(position, total); - if (ptr == nullptr) { + mptr = idx->lookupSequential(position, total); + if (mptr == nullptr) { break; } ++count; - docs.emplace_back(*ptr); + docs.emplace_back(*mptr); } while (count < limit); + this->unlock(trxCollection, TRI_TRANSACTION_READ); return TRI_ERROR_NO_ERROR; } @@ -1034,15 +1043,18 @@ namespace triagens { uint64_t position = 0; uint64_t total = 0; auto idx = document->primaryIndex(); - TRI_doc_mptr_t const* ptr = nullptr; docs.reserve(idx->size()); + while (true) { - ptr = idx->lookupSequential(position, total); - if (ptr == nullptr) { + TRI_doc_mptr_t const* mptr = idx->lookupSequential(position, total); + + if (mptr == nullptr) { break; } - docs.emplace_back(ptr); + + docs.emplace_back(mptr); } + this->unlock(trxCollection, TRI_TRANSACTION_READ); return TRI_ERROR_NO_ERROR; } diff --git a/arangod/VocBase/document-collection.cpp b/arangod/VocBase/document-collection.cpp index b1d3898bfd..e267358946 100644 --- a/arangod/VocBase/document-collection.cpp +++ b/arangod/VocBase/document-collection.cpp @@ -2585,10 +2585,11 @@ size_t TRI_DocumentIteratorDocumentCollection (TransactionBase const*, if (nrUsed > 0) { uint64_t position = 0; uint64_t total = 0; - TRI_doc_mptr_t const* ptr = nullptr; + while (true) { - ptr = idx->lookupSequential(position, total); - if (ptr == nullptr || ! callback(ptr, document, data)) { + TRI_doc_mptr_t const* mptr = idx->lookupSequential(position, total); + + if (mptr == nullptr || ! callback(mptr, document, data)) { break; } } @@ -3191,13 +3192,19 @@ static int FillIndexBatch (TRI_document_collection_t* document, std::vector documents; documents.reserve(blockSize); + if (nrUsed > 0) { uint64_t position = 0; uint64_t total = 0; - TRI_doc_mptr_t const* mptr; - do { - mptr = primaryIndex->lookupSequential(position, total); + while (true) { + TRI_doc_mptr_t const* mptr = primaryIndex->lookupSequential(position, total); + + if (mptr == nullptr) { + break; + } + documents.emplace_back(mptr); + if (documents.size() == blockSize) { res = idx->batchInsert(&documents, indexPool->numThreads()); documents.clear(); @@ -3208,7 +3215,6 @@ static int FillIndexBatch (TRI_document_collection_t* document, } } } - while (mptr != nullptr); } // process the remainder of the documents @@ -3257,13 +3263,16 @@ static int FillIndexSequential (TRI_document_collection_t* document, if (nrUsed > 0) { uint64_t position = 0; uint64_t total = 0; - TRI_doc_mptr_t const* mptr = nullptr; + while (true) { - mptr = primaryIndex->lookupSequential(position, total); + TRI_doc_mptr_t const* mptr = primaryIndex->lookupSequential(position, total); + if (mptr == nullptr) { break; } + int res = idx->insert(mptr, false); + if (res != TRI_ERROR_NO_ERROR) { return res; }