mirror of https://gitee.com/bigwinds/arangodb
IndexRangeBlock working for hash indexes.
This commit is contained in:
parent
30264eb6d2
commit
bf952d37be
|
@ -356,6 +356,7 @@ void ExecutionBlock::clearRegisters (AqlItemBlock* result) {
|
|||
}
|
||||
|
||||
size_t ExecutionBlock::skipSome (size_t atLeast, size_t atMost) {
|
||||
std::cout << "ExecutionBlock::skipSome\n";
|
||||
TRI_ASSERT(0 < atLeast && atLeast <= atMost);
|
||||
size_t skipped = 0;
|
||||
AqlItemBlock* result = nullptr;
|
||||
|
@ -370,6 +371,7 @@ size_t ExecutionBlock::skipSome (size_t atLeast, size_t atMost) {
|
|||
// skip exactly <number> outputs, returns <true> if _done after
|
||||
// skipping, and <false> otherwise . . .
|
||||
bool ExecutionBlock::skip (size_t number) {
|
||||
std::cout << "ExecutionBlock::skip\n";
|
||||
size_t skipped = skipSome(number, number);
|
||||
size_t nr = skipped;
|
||||
while ( nr != 0 && skipped < number ){
|
||||
|
@ -410,7 +412,8 @@ int ExecutionBlock::getOrSkipSome (size_t atLeast,
|
|||
bool skipping,
|
||||
AqlItemBlock*& result,
|
||||
size_t& skipped) {
|
||||
|
||||
|
||||
std::cout << "ExecutionBlock::getOrSkipSome\n";
|
||||
TRI_ASSERT(result == nullptr && skipped == 0);
|
||||
if (_done) {
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
|
@ -777,42 +780,45 @@ IndexRangeBlock::IndexRangeBlock (ExecutionEngine* engine,
|
|||
IndexRangeNode const* ep)
|
||||
: ExecutionBlock(engine, ep),
|
||||
_collection(ep->_collection),
|
||||
_posInAllDocs(0) {
|
||||
_posInDocs(0) {
|
||||
|
||||
// std::cout << "USING INDEX: " << ep->_index->_iid << ", " << TRI_TypeNameIndex(ep->_index->_type) << "\n";
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_NOT_IMPLEMENTED);
|
||||
std::cout << "USING INDEX: " << ep->_index->_iid << ", " <<
|
||||
TRI_TypeNameIndex(ep->_index->_type) << "\n";
|
||||
|
||||
}
|
||||
|
||||
IndexRangeBlock::~IndexRangeBlock () {
|
||||
}
|
||||
|
||||
bool IndexRangeBlock::moreDocuments () {
|
||||
bool IndexRangeBlock::readIndex () {
|
||||
|
||||
if (_documents.empty()) {
|
||||
_documents.reserve(DefaultBatchSize);
|
||||
}
|
||||
|
||||
_documents.clear();
|
||||
|
||||
/*
|
||||
auto en = static_cast<IndexRangeNode const*>(getPlanNode());
|
||||
|
||||
/*
|
||||
if (en->_index->_type == TRI_IDX_TYPE_SKIPLIST_INDEX) {
|
||||
readSkiplistIndex();
|
||||
}
|
||||
else if (en->_index->_type == TRI_IDX_TYPE_HASH_INDEX) {
|
||||
else */
|
||||
if (en->_index->_type == TRI_IDX_TYPE_HASH_INDEX) {
|
||||
readHashIndex();
|
||||
}
|
||||
else {
|
||||
TRI_ASSERT(false);
|
||||
}
|
||||
*/
|
||||
return (! _documents.empty());
|
||||
return (!_documents.empty());
|
||||
}
|
||||
|
||||
int IndexRangeBlock::initialize () {
|
||||
int res = ExecutionBlock::initialize();
|
||||
|
||||
readIndex(); // this is currently only done once in the lifetime of the node
|
||||
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
if (_trx->orderBarrier(_trx->trxCollection(_collection->cid())) == nullptr) {
|
||||
res = TRI_ERROR_OUT_OF_MEMORY;
|
||||
|
@ -827,14 +833,9 @@ int IndexRangeBlock::initCursor (AqlItemBlock* items, size_t pos) {
|
|||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
|
||||
initDocuments();
|
||||
|
||||
if (_totalCount == 0) {
|
||||
_done = true;
|
||||
}
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
_pos = 0;
|
||||
_posInDocs = 0;
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -843,6 +844,7 @@ int IndexRangeBlock::initCursor (AqlItemBlock* items, size_t pos) {
|
|||
|
||||
AqlItemBlock* IndexRangeBlock::getSome (size_t atLeast,
|
||||
size_t atMost) {
|
||||
std::cout << "IndexRangeBlock::getSome\n";
|
||||
if (_done) {
|
||||
return nullptr;
|
||||
}
|
||||
|
@ -853,15 +855,14 @@ AqlItemBlock* IndexRangeBlock::getSome (size_t atLeast,
|
|||
return nullptr;
|
||||
}
|
||||
_pos = 0; // this is in the first block
|
||||
_posInAllDocs = 0; // Note that we know _allDocs.size() > 0,
|
||||
// otherwise _done would be true already
|
||||
_posInDocs = 0; // position in _documents . . .
|
||||
}
|
||||
|
||||
// If we get here, we do have _buffer.front()
|
||||
AqlItemBlock* cur = _buffer.front();
|
||||
size_t const curRegs = cur->getNrRegs();
|
||||
|
||||
size_t available = _documents.size() - _posInAllDocs;
|
||||
size_t available = _documents.size() - _posInDocs;
|
||||
size_t toSend = std::min(atMost, available);
|
||||
|
||||
unique_ptr<AqlItemBlock> res(new AqlItemBlock(toSend, _varOverview->nrRegs[_depth]));
|
||||
|
@ -889,76 +890,77 @@ AqlItemBlock* IndexRangeBlock::getSome (size_t atLeast,
|
|||
// but can just take cur->getNrRegs() as registerId:
|
||||
res->setValue(j, curRegs,
|
||||
AqlValue(reinterpret_cast<TRI_df_marker_t
|
||||
const*>(_documents[_posInAllDocs++].getDataPtr())));
|
||||
const*>(_documents[_posInDocs++].getDataPtr())));
|
||||
// No harm done, if the setValue throws!
|
||||
}
|
||||
|
||||
// Advance read position:
|
||||
if (_posInAllDocs >= _documents.size()) {
|
||||
if (_posInDocs >= _documents.size()) {
|
||||
// we have exhausted our local documents buffer
|
||||
_posInAllDocs = 0;
|
||||
_posInDocs = 0;
|
||||
|
||||
// fetch more documents into our buffer
|
||||
if (! moreDocuments()) {
|
||||
// nothing more to read, re-initialize fetching of documents
|
||||
initDocuments();
|
||||
if (++_pos >= cur->size()) {
|
||||
_buffer.pop_front(); // does not throw
|
||||
delete cur;
|
||||
_pos = 0;
|
||||
}
|
||||
if (++_pos >= cur->size()) {
|
||||
_buffer.pop_front(); // does not throw
|
||||
delete cur;
|
||||
_pos = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Clear out registers no longer needed later:
|
||||
clearRegisters(res.get());
|
||||
return res.release();
|
||||
}
|
||||
|
||||
size_t IndexRangeBlock::skipSome (size_t atLeast, size_t atMost) {
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief skipSome
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t IndexRangeBlock::skipSome (size_t atLeast,
|
||||
size_t atMost) {
|
||||
|
||||
std::cout << "IndexRangeBlock::skipSome\n";
|
||||
|
||||
if (_done) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t skipped = 0;
|
||||
|
||||
if (_done) {
|
||||
return skipped;
|
||||
}
|
||||
|
||||
while (skipped < atLeast) {
|
||||
while (skipped < atLeast ){
|
||||
if (_buffer.empty()) {
|
||||
if (! getBlock(DefaultBatchSize, DefaultBatchSize)) {
|
||||
if (! ExecutionBlock::getBlock(DefaultBatchSize, DefaultBatchSize)) {
|
||||
_done = true;
|
||||
return skipped;
|
||||
return 0;
|
||||
}
|
||||
_pos = 0; // this is in the first block
|
||||
_posInAllDocs = 0; // Note that we know _allDocs.size() > 0,
|
||||
// otherwise _done would be true already
|
||||
}
|
||||
|
||||
// if we get here, then _buffer.front() exists
|
||||
// If we get here, we do have _buffer.front()
|
||||
AqlItemBlock* cur = _buffer.front();
|
||||
|
||||
if (atMost >= skipped + _documents.size() - _posInAllDocs) {
|
||||
skipped += _documents.size() - _posInAllDocs;
|
||||
_posInAllDocs = 0;
|
||||
_posInDocs += std::min(atMost, _documents.size() - _posInDocs);
|
||||
|
||||
// fetch more documents into our buffer
|
||||
if (! moreDocuments()) {
|
||||
// nothing more to read, re-initialize fetching of documents
|
||||
initDocuments();
|
||||
if (++_pos >= cur->size()) {
|
||||
_buffer.pop_front(); // does not throw
|
||||
delete cur;
|
||||
_pos = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
_posInAllDocs += atMost - skipped;
|
||||
|
||||
if (atMost < _documents.size() - _posInDocs){
|
||||
// eat just enough of _documents . . .
|
||||
_posInDocs += atMost;
|
||||
skipped = atMost;
|
||||
}
|
||||
else {
|
||||
// eat the whole of the current inVariable and proceed . . .
|
||||
skipped += (_documents.size() - _posInDocs);
|
||||
_posInDocs = 0;
|
||||
delete cur;
|
||||
_buffer.pop_front();
|
||||
_pos = 0;
|
||||
|
||||
}
|
||||
}
|
||||
return skipped;
|
||||
|
||||
return skipped;
|
||||
}
|
||||
/*
|
||||
void IndexRangeBlock::readSkiplistIndex () {
|
||||
|
||||
/* void IndexRangeBlock::readSkiplistIndex () {
|
||||
auto en = static_cast<IndexRangeNode const*>(getPlanNode());
|
||||
TRI_index_t* idx = en->_index;
|
||||
TRI_ASSERT(idx != nullptr);
|
||||
|
@ -985,8 +987,7 @@ void IndexRangeBlock::readSkiplistIndex () {
|
|||
}
|
||||
|
||||
TRI_FreeSkiplistIterator(skiplistIterator);
|
||||
}
|
||||
|
||||
}*/
|
||||
|
||||
void IndexRangeBlock::readHashIndex () {
|
||||
auto en = static_cast<IndexRangeNode const*>(getPlanNode());
|
||||
|
@ -1012,7 +1013,8 @@ void IndexRangeBlock::readHashIndex () {
|
|||
auto setupSearchValue = [&]() {
|
||||
size_t const n = hashIndex->_paths._length;
|
||||
searchValue._length = 0;
|
||||
searchValue._values = static_cast<TRI_shaped_json_t*>(TRI_Allocate(TRI_CORE_MEM_ZONE, n * sizeof(TRI_shaped_json_t), true));
|
||||
searchValue._values = static_cast<TRI_shaped_json_t*>(TRI_Allocate(TRI_CORE_MEM_ZONE,
|
||||
n * sizeof(TRI_shaped_json_t), true));
|
||||
|
||||
if (searchValue._values == nullptr) {
|
||||
THROW_ARANGO_EXCEPTION(TRI_ERROR_OUT_OF_MEMORY);
|
||||
|
@ -1026,8 +1028,19 @@ void IndexRangeBlock::readHashIndex () {
|
|||
|
||||
char const* name = TRI_AttributeNameShapePid(shaper, pid);
|
||||
|
||||
for (auto x: en->_ranges.at(0)) {
|
||||
if (x->_attr == std::string(name)){//found attribute
|
||||
std::cout << "found " << x->_attr << "\n";
|
||||
std::cout << x->_low->_bound.toString() << "\n";
|
||||
auto shaped = TRI_ShapedJsonJson(shaper,
|
||||
JsonHelper::getArrayElement(x->_low->_bound.json(), "value"), false);
|
||||
std::cout << "MADE IT\n";
|
||||
searchValue._values[i] = *shaped;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << "PID: " << pid << ", NAME: " << name << "\n";
|
||||
// searchValue._values[i] = TRI_ShapedJsonJson(shaper, src, false, true);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1043,7 +1056,6 @@ void IndexRangeBlock::readHashIndex () {
|
|||
|
||||
TRI_DestroyIndexResult(&list);
|
||||
}
|
||||
*/
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- class EnumerateListBlock
|
||||
|
|
|
@ -618,18 +618,12 @@ public:
|
|||
/// @brief initialize fetching of documents
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
void initDocuments () {
|
||||
/*void initDocuments () {
|
||||
_internalSkip = 0;
|
||||
if (! moreDocuments()) {
|
||||
_done = true;
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief continue fetching of documents
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool moreDocuments ();
|
||||
}*/
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief initialize, here we fetch all docs from the database
|
||||
|
@ -651,7 +645,7 @@ public:
|
|||
// things to skip overall.
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t skipSome (size_t atLeast, size_t atMost);
|
||||
virtual size_t skipSome (size_t atLeast, size_t atMost);
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// --SECTION-- private methods
|
||||
|
@ -659,6 +653,12 @@ public:
|
|||
|
||||
private:
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief continue fetching of documents
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
bool readIndex ();
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// @brief read using a skiplist index
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -705,7 +705,7 @@ public:
|
|||
/// @brief current position in _allDocs
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
size_t _posInAllDocs;
|
||||
size_t _posInDocs;
|
||||
|
||||
};
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ Optimizer::Optimizer () {
|
|||
|
||||
|
||||
// try to find a filter after an enumerate collection and find an index . . .
|
||||
//registerRule(useIndexRange, 999);
|
||||
registerRule(useIndexRange, 999);
|
||||
|
||||
// remove filters from the query that are not necessary at all
|
||||
// filters that are always true will be removed entirely
|
||||
|
|
|
@ -348,6 +348,7 @@ class FilterToEnumCollFinder : public WalkerWorker<ExecutionNode> {
|
|||
// check the first components of <map> against indexes of <node> . . .
|
||||
std::vector<std::string> attrs;
|
||||
std::vector<std::vector<RangeInfo*>> rangeInfo;
|
||||
rangeInfo.push_back(std::vector<RangeInfo*>());
|
||||
bool valid = true;
|
||||
bool eq = true;
|
||||
for (auto x : *map) {
|
||||
|
@ -360,7 +361,7 @@ class FilterToEnumCollFinder : public WalkerWorker<ExecutionNode> {
|
|||
}
|
||||
if (! _canThrow) {
|
||||
if (! valid){ // ranges are not valid . . .
|
||||
// std::cout << "INVALID RANGE!\n";
|
||||
std::cout << "INVALID RANGE!\n";
|
||||
|
||||
auto newPlan = _plan->clone();
|
||||
auto parents = node->getParents();
|
||||
|
@ -380,7 +381,7 @@ class FilterToEnumCollFinder : public WalkerWorker<ExecutionNode> {
|
|||
(idx->_type == TRI_IDX_TYPE_HASH_INDEX && eq)) {
|
||||
//can only use the index if it is a skip list or (a hash and we
|
||||
//are checking equality)
|
||||
// std::cout << "FOUND INDEX!\n";
|
||||
std::cout << "FOUND INDEX!\n";
|
||||
auto newPlan = _plan->clone();
|
||||
ExecutionNode* newNode = nullptr;
|
||||
try{
|
||||
|
|
Loading…
Reference in New Issue