1
0
Fork 0

fixed invalid index lookupus

This commit is contained in:
Claudius Weinberger 2015-09-01 12:38:51 +00:00
parent 25a52dac53
commit f52beeacb0
5 changed files with 85 additions and 44 deletions

View File

@ -46,6 +46,8 @@ using namespace triagens::arango;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
static uint64_t HashElementKey (void const* data) { static uint64_t HashElementKey (void const* data) {
TRI_ASSERT_EXPENSIVE(data != nullptr);
TRI_edge_header_t const* h = static_cast<TRI_edge_header_t const*>(data); TRI_edge_header_t const* h = static_cast<TRI_edge_header_t const*>(data);
char const* key = h->_key; char const* key = h->_key;
@ -61,6 +63,8 @@ static uint64_t HashElementKey (void const* data) {
static uint64_t HashElementEdgeFrom (void const* data, static uint64_t HashElementEdgeFrom (void const* data,
bool byKey) { bool byKey) {
TRI_ASSERT_EXPENSIVE(data != nullptr);
uint64_t hash; uint64_t hash;
if (! byKey) { if (! byKey) {
@ -99,6 +103,8 @@ static uint64_t HashElementEdgeFrom (void const* data,
static uint64_t HashElementEdgeTo (void const* data, static uint64_t HashElementEdgeTo (void const* data,
bool byKey) { bool byKey) {
TRI_ASSERT_EXPENSIVE(data != nullptr);
uint64_t hash; uint64_t hash;
if (! byKey) { if (! byKey) {
@ -137,6 +143,9 @@ static uint64_t HashElementEdgeTo (void const* data,
static bool IsEqualKeyEdgeFrom (void const* left, static bool IsEqualKeyEdgeFrom (void const* left,
void const* right) { void const* right) {
TRI_ASSERT_EXPENSIVE(left != nullptr);
TRI_ASSERT_EXPENSIVE(right != nullptr);
// left is a key // left is a key
// right is an element, that is a master pointer // right is an element, that is a master pointer
TRI_edge_header_t const* l = static_cast<TRI_edge_header_t const*>(left); TRI_edge_header_t const* l = static_cast<TRI_edge_header_t const*>(left);
@ -170,6 +179,9 @@ static bool IsEqualKeyEdgeFrom (void const* left,
static bool IsEqualKeyEdgeTo (void const* left, static bool IsEqualKeyEdgeTo (void const* left,
void const* right) { void const* right) {
TRI_ASSERT_EXPENSIVE(left != nullptr);
TRI_ASSERT_EXPENSIVE(right != nullptr);
// left is a key // left is a key
// right is an element, that is a master pointer // right is an element, that is a master pointer
TRI_edge_header_t const* l = static_cast<TRI_edge_header_t const*>(left); TRI_edge_header_t const* l = static_cast<TRI_edge_header_t const*>(left);
@ -213,6 +225,9 @@ static bool IsEqualElementEdge (void const* left,
static bool IsEqualElementEdgeFromByKey (void const* left, static bool IsEqualElementEdgeFromByKey (void const* left,
void const* right) { void const* right) {
TRI_ASSERT_EXPENSIVE(left != nullptr);
TRI_ASSERT_EXPENSIVE(right != nullptr);
char const* lKey = nullptr; char const* lKey = nullptr;
char const* rKey = nullptr; char const* rKey = nullptr;
TRI_voc_cid_t lCid = 0; TRI_voc_cid_t lCid = 0;
@ -264,6 +279,9 @@ static bool IsEqualElementEdgeFromByKey (void const* left,
static bool IsEqualElementEdgeToByKey (void const* left, static bool IsEqualElementEdgeToByKey (void const* left,
void const* right) { void const* right) {
TRI_ASSERT_EXPENSIVE(left != nullptr);
TRI_ASSERT_EXPENSIVE(right != nullptr);
char const* lKey = nullptr; char const* lKey = nullptr;
char const* rKey = nullptr; char const* rKey = nullptr;
TRI_voc_cid_t lCid = 0; TRI_voc_cid_t lCid = 0;

View File

@ -179,10 +179,10 @@ TRI_doc_mptr_t* PrimaryIndex::lookupKey (char const* key,
/// Convention: step === 0 indicates a new start. /// Convention: step === 0 indicates a new start.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_doc_mptr_t* PrimaryIndex::lookupRandom(uint64_t& initialPosition, TRI_doc_mptr_t* PrimaryIndex::lookupRandom (uint64_t& initialPosition,
uint64_t& position, uint64_t& position,
uint64_t& step, uint64_t& step,
uint64_t& total) { uint64_t& total) {
return _primaryIndex->findRandom(initialPosition, position, step, total); return _primaryIndex->findRandom(initialPosition, position, step, total);
} }
@ -193,8 +193,8 @@ TRI_doc_mptr_t* PrimaryIndex::lookupRandom(uint64_t& initialPosition,
/// Convention: position === 0 indicates a new start. /// Convention: position === 0 indicates a new start.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_doc_mptr_t* PrimaryIndex::lookupSequential(uint64_t& position, TRI_doc_mptr_t* PrimaryIndex::lookupSequential (uint64_t& position,
uint64_t& total) { uint64_t& total) {
return _primaryIndex->findSequential(position, total); return _primaryIndex->findSequential(position, total);
} }
@ -205,11 +205,10 @@ TRI_doc_mptr_t* PrimaryIndex::lookupSequential(uint64_t& position,
/// Convention: position === UINT64_MAX indicates a new start. /// Convention: position === UINT64_MAX indicates a new start.
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
TRI_doc_mptr_t* PrimaryIndex::lookupSequentialReverse(uint64_t& position) { TRI_doc_mptr_t* PrimaryIndex::lookupSequentialReverse (uint64_t& position) {
return _primaryIndex->findSequentialReverse(position); return _primaryIndex->findSequentialReverse(position);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
/// @brief adds a key/element to the index /// @brief adds a key/element to the index
/// returns a status code, and *found will contain a found element (if any) /// returns a status code, and *found will contain a found element (if any)

View File

@ -138,10 +138,13 @@ void CollectionExport::run (uint64_t maxWaitTime, size_t limit) {
uint64_t total = 0; uint64_t total = 0;
while (limit > 0) { while (limit > 0) {
auto ptr = idx->lookupSequential(position, total); auto ptr = idx->lookupSequential(position, total);
if (ptr == nullptr) { if (ptr == nullptr) {
break; break;
} }
void const* marker = ptr->getDataPtr(); void const* marker = ptr->getDataPtr();
if (! TRI_IsWalDataMarkerDatafile(marker)) { if (! TRI_IsWalDataMarkerDatafile(marker)) {
_documents->emplace_back(marker); _documents->emplace_back(marker);
--limit; --limit;

View File

@ -427,17 +427,17 @@ namespace triagens {
docs.reserve(batchSize); docs.reserve(batchSize);
} }
TRI_ASSERT(batchSize > 0); TRI_ASSERT(batchSize > 0);
TRI_doc_mptr_t const* ptr = nullptr;
while (count < batchSize) { while (count < batchSize) {
ptr = document->primaryIndex()->lookupSequential(internalSkip, total); TRI_doc_mptr_t const* mptr = document->primaryIndex()->lookupSequential(internalSkip, total);
if (ptr == nullptr) { if (mptr == nullptr) {
break; break;
} }
if (skip > 0) { if (skip > 0) {
--skip; --skip;
} }
else { else {
docs.emplace_back(*ptr); docs.emplace_back(*mptr);
if (++count >= limit) { if (++count >= limit) {
break; break;
} }
@ -481,13 +481,14 @@ namespace triagens {
uint64_t numRead = 0; uint64_t numRead = 0;
TRI_ASSERT(batchSize > 0); TRI_ASSERT(batchSize > 0);
while (numRead < batchSize) { while (numRead < batchSize) {
auto d = document->primaryIndex()->lookupRandom(initialPosition, position, step, total); auto mptr = document->primaryIndex()->lookupRandom(initialPosition, position, step, total);
if (d == nullptr) { if (mptr == nullptr) {
// Read all documents randomly // Read all documents randomly
break; break;
} }
docs.emplace_back(*d); docs.emplace_back(*mptr);
++numRead; ++numRead;
} }
this->unlock(trxCollection, TRI_TRANSACTION_READ); this->unlock(trxCollection, TRI_TRANSACTION_READ);
@ -815,6 +816,7 @@ namespace triagens {
uint64_t pos = 0; uint64_t pos = 0;
uint64_t step = 0; uint64_t step = 0;
uint64_t total = 0; uint64_t total = 0;
TRI_doc_mptr_t* found = idx->lookupRandom(intPos, pos, step, total); TRI_doc_mptr_t* found = idx->lookupRandom(intPos, pos, step, total);
if (found != nullptr) { if (found != nullptr) {
*mptr = *found; *mptr = *found;
@ -847,16 +849,18 @@ namespace triagens {
} }
auto idx = document->primaryIndex(); auto idx = document->primaryIndex();
size_t used = idx->size(); size_t used = idx->size();
if (used > 0) { if (used > 0) {
uint64_t step = 0; uint64_t step = 0;
uint64_t total = 0; uint64_t total = 0;
TRI_doc_mptr_t const* d = nullptr;
while (true) { while (true) {
d = idx->lookupSequential(step, total); TRI_doc_mptr_t const* mptr = idx->lookupSequential(step, total);
if (d == nullptr) {
if (mptr == nullptr) {
break; break;
} }
ids.emplace_back(TRI_EXTRACT_MARKER_KEY(d)); ids.emplace_back(TRI_EXTRACT_MARKER_KEY(mptr));
} }
} }
@ -962,27 +966,30 @@ namespace triagens {
uint64_t count = 0; uint64_t count = 0;
auto idx = document->primaryIndex(); auto idx = document->primaryIndex();
TRI_doc_mptr_t const* ptr = nullptr; TRI_doc_mptr_t const* mptr = nullptr;
if (skip < 0) { if (skip < 0) {
uint64_t position = UINT64_MAX; uint64_t position = UINT64_MAX;
do { do {
ptr = idx->lookupSequentialReverse(position); mptr = idx->lookupSequentialReverse(position);
++skip; ++skip;
} }
while (skip < 0 && ptr != nullptr); while (skip < 0 && mptr != nullptr);
if (ptr == nullptr) {
if (mptr == nullptr) {
this->unlock(trxCollection, TRI_TRANSACTION_READ); this->unlock(trxCollection, TRI_TRANSACTION_READ);
// To few elements, skipped all // To few elements, skipped all
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
do { do {
ptr = idx->lookupSequentialReverse(position); mptr = idx->lookupSequentialReverse(position);
if (ptr == nullptr) {
if (mptr == nullptr) {
break; break;
} }
++count; ++count;
docs.emplace_back(*ptr); docs.emplace_back(*mptr);
} }
while (count < limit); while (count < limit);
@ -992,23 +999,25 @@ namespace triagens {
uint64_t position = 0; uint64_t position = 0;
while (skip > 0) { while (skip > 0) {
ptr = idx->lookupSequential(position, total); mptr = idx->lookupSequential(position, total);
--skip; --skip;
if (ptr == nullptr) { if (mptr == nullptr) {
// To few elements, skipped all // To few elements, skipped all
this->unlock(trxCollection, TRI_TRANSACTION_READ); this->unlock(trxCollection, TRI_TRANSACTION_READ);
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
} }
do { do {
ptr = idx->lookupSequential(position, total); mptr = idx->lookupSequential(position, total);
if (ptr == nullptr) { if (mptr == nullptr) {
break; break;
} }
++count; ++count;
docs.emplace_back(*ptr); docs.emplace_back(*mptr);
} }
while (count < limit); while (count < limit);
this->unlock(trxCollection, TRI_TRANSACTION_READ); this->unlock(trxCollection, TRI_TRANSACTION_READ);
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }
@ -1034,15 +1043,18 @@ namespace triagens {
uint64_t position = 0; uint64_t position = 0;
uint64_t total = 0; uint64_t total = 0;
auto idx = document->primaryIndex(); auto idx = document->primaryIndex();
TRI_doc_mptr_t const* ptr = nullptr;
docs.reserve(idx->size()); docs.reserve(idx->size());
while (true) { while (true) {
ptr = idx->lookupSequential(position, total); TRI_doc_mptr_t const* mptr = idx->lookupSequential(position, total);
if (ptr == nullptr) {
if (mptr == nullptr) {
break; break;
} }
docs.emplace_back(ptr);
docs.emplace_back(mptr);
} }
this->unlock(trxCollection, TRI_TRANSACTION_READ); this->unlock(trxCollection, TRI_TRANSACTION_READ);
return TRI_ERROR_NO_ERROR; return TRI_ERROR_NO_ERROR;
} }

View File

@ -2585,10 +2585,11 @@ size_t TRI_DocumentIteratorDocumentCollection (TransactionBase const*,
if (nrUsed > 0) { if (nrUsed > 0) {
uint64_t position = 0; uint64_t position = 0;
uint64_t total = 0; uint64_t total = 0;
TRI_doc_mptr_t const* ptr = nullptr;
while (true) { while (true) {
ptr = idx->lookupSequential(position, total); TRI_doc_mptr_t const* mptr = idx->lookupSequential(position, total);
if (ptr == nullptr || ! callback(ptr, document, data)) {
if (mptr == nullptr || ! callback(mptr, document, data)) {
break; break;
} }
} }
@ -3191,13 +3192,19 @@ static int FillIndexBatch (TRI_document_collection_t* document,
std::vector<TRI_doc_mptr_t const*> documents; std::vector<TRI_doc_mptr_t const*> documents;
documents.reserve(blockSize); documents.reserve(blockSize);
if (nrUsed > 0) { if (nrUsed > 0) {
uint64_t position = 0; uint64_t position = 0;
uint64_t total = 0; uint64_t total = 0;
TRI_doc_mptr_t const* mptr; while (true) {
do { TRI_doc_mptr_t const* mptr = primaryIndex->lookupSequential(position, total);
mptr = primaryIndex->lookupSequential(position, total);
if (mptr == nullptr) {
break;
}
documents.emplace_back(mptr); documents.emplace_back(mptr);
if (documents.size() == blockSize) { if (documents.size() == blockSize) {
res = idx->batchInsert(&documents, indexPool->numThreads()); res = idx->batchInsert(&documents, indexPool->numThreads());
documents.clear(); documents.clear();
@ -3208,7 +3215,6 @@ static int FillIndexBatch (TRI_document_collection_t* document,
} }
} }
} }
while (mptr != nullptr);
} }
// process the remainder of the documents // process the remainder of the documents
@ -3257,13 +3263,16 @@ static int FillIndexSequential (TRI_document_collection_t* document,
if (nrUsed > 0) { if (nrUsed > 0) {
uint64_t position = 0; uint64_t position = 0;
uint64_t total = 0; uint64_t total = 0;
TRI_doc_mptr_t const* mptr = nullptr;
while (true) { while (true) {
mptr = primaryIndex->lookupSequential(position, total); TRI_doc_mptr_t const* mptr = primaryIndex->lookupSequential(position, total);
if (mptr == nullptr) { if (mptr == nullptr) {
break; break;
} }
int res = idx->insert(mptr, false); int res = idx->insert(mptr, false);
if (res != TRI_ERROR_NO_ERROR) { if (res != TRI_ERROR_NO_ERROR) {
return res; return res;
} }