mirror of https://gitee.com/bigwinds/arangodb
290 lines
8.8 KiB
C++
290 lines
8.8 KiB
C++
////////////////////////////////////////////////////////////////////////////////
|
|
/// @brief AQL SortBlock
|
|
///
|
|
/// @file
|
|
///
|
|
/// DISCLAIMER
|
|
///
|
|
/// Copyright 2010-2014 triagens GmbH, Cologne, Germany
|
|
///
|
|
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|
/// you may not use this file except in compliance with the License.
|
|
/// You may obtain a copy of the License at
|
|
///
|
|
/// http://www.apache.org/licenses/LICENSE-2.0
|
|
///
|
|
/// Unless required by applicable law or agreed to in writing, software
|
|
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
/// See the License for the specific language governing permissions and
|
|
/// limitations under the License.
|
|
///
|
|
/// Copyright holder is triAGENS GmbH, Cologne, Germany
|
|
///
|
|
/// @author Max Neunhoeffer
|
|
/// @author Copyright 2014, triagens GmbH, Cologne, Germany
|
|
////////////////////////////////////////////////////////////////////////////////
|
|
|
|
#include "Aql/SortBlock.h"
|
|
#include "Aql/ExecutionEngine.h"
|
|
#include "Basics/Exceptions.h"
|
|
#include "VocBase/vocbase.h"
|
|
|
|
using namespace std;
|
|
using namespace triagens::arango;
|
|
using namespace triagens::aql;
|
|
|
|
using Json = triagens::basics::Json;
|
|
using JsonHelper = triagens::basics::JsonHelper;
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// --SECTION-- class SortBlock
|
|
// -----------------------------------------------------------------------------
|
|
|
|
SortBlock::SortBlock (ExecutionEngine* engine,
|
|
SortNode const* en)
|
|
: ExecutionBlock(engine, en),
|
|
_sortRegisters(),
|
|
_stable(en->_stable) {
|
|
|
|
for (auto const& p : en->_elements) {
|
|
auto it = en->getRegisterPlan()->varInfo.find(p.first->id);
|
|
TRI_ASSERT(it != en->getRegisterPlan()->varInfo.end());
|
|
TRI_ASSERT(it->second.registerId < ExecutionNode::MaxRegisterId);
|
|
_sortRegisters.emplace_back(make_pair(it->second.registerId, p.second));
|
|
}
|
|
}
|
|
|
|
SortBlock::~SortBlock () {
|
|
}
|
|
|
|
int SortBlock::initialize () {
|
|
return ExecutionBlock::initialize();
|
|
}
|
|
|
|
int SortBlock::initializeCursor (AqlItemBlock* items, size_t pos) {
|
|
int res = ExecutionBlock::initializeCursor(items, pos);
|
|
if (res != TRI_ERROR_NO_ERROR) {
|
|
return res;
|
|
}
|
|
// suck all blocks into _buffer
|
|
while (getBlock(DefaultBatchSize, DefaultBatchSize)) {
|
|
}
|
|
|
|
if (_buffer.empty()) {
|
|
_done = true;
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
doSorting();
|
|
|
|
_done = false;
|
|
_pos = 0;
|
|
|
|
return TRI_ERROR_NO_ERROR;
|
|
}
|
|
|
|
void SortBlock::doSorting () {
|
|
// coords[i][j] is the <j>th row of the <i>th block
|
|
std::vector<std::pair<size_t, size_t>> coords;
|
|
|
|
size_t sum = 0;
|
|
for (auto const& block : _buffer) {
|
|
sum += block->size();
|
|
}
|
|
|
|
TRI_IF_FAILURE("SortBlock::doSorting") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
coords.reserve(sum);
|
|
|
|
// install the coords
|
|
size_t count = 0;
|
|
|
|
for (auto const& block : _buffer) {
|
|
for (size_t i = 0; i < block->size(); i++) {
|
|
coords.emplace_back(std::make_pair(count, i));
|
|
}
|
|
count++;
|
|
}
|
|
|
|
std::vector<TRI_document_collection_t const*> colls;
|
|
for (RegisterId i = 0; i < _sortRegisters.size(); i++) {
|
|
colls.emplace_back(_buffer.front()->getDocumentCollection(_sortRegisters[i].first));
|
|
}
|
|
|
|
// comparison function
|
|
OurLessThan ourLessThan(_trx, _buffer, _sortRegisters, colls);
|
|
|
|
// sort coords
|
|
if (_stable) {
|
|
std::stable_sort(coords.begin(), coords.end(), ourLessThan);
|
|
}
|
|
else {
|
|
std::sort(coords.begin(), coords.end(), ourLessThan);
|
|
}
|
|
|
|
// here we collect the new blocks (later swapped into _buffer):
|
|
std::deque<AqlItemBlock*> newbuffer;
|
|
|
|
try { // If we throw from here, the catch will delete the new
|
|
// blocks in newbuffer
|
|
|
|
count = 0;
|
|
RegisterId const nrregs = _buffer.front()->getNrRegs();
|
|
|
|
// install the rearranged values from _buffer into newbuffer
|
|
|
|
while (count < sum) {
|
|
size_t sizeNext = (std::min)(sum - count, DefaultBatchSize);
|
|
AqlItemBlock* next = new AqlItemBlock(sizeNext, nrregs);
|
|
|
|
try {
|
|
TRI_IF_FAILURE("SortBlock::doSortingInner") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
newbuffer.emplace_back(next);
|
|
}
|
|
catch (...) {
|
|
delete next;
|
|
throw;
|
|
}
|
|
|
|
std::unordered_map<AqlValue, AqlValue> cache;
|
|
// only copy as much as needed!
|
|
for (size_t i = 0; i < sizeNext; i++) {
|
|
for (RegisterId j = 0; j < nrregs; j++) {
|
|
auto a = _buffer[coords[count].first]->getValue(coords[count].second, j);
|
|
// If we have already dealt with this value for the next
|
|
// block, then we just put the same value again:
|
|
if (! a.isEmpty()) {
|
|
auto it = cache.find(a);
|
|
|
|
if (it != cache.end()) {
|
|
AqlValue const& b = it->second;
|
|
// If one of the following throws, all is well, because
|
|
// the new block already has either a copy or stolen
|
|
// the AqlValue:
|
|
_buffer[coords[count].first]->eraseValue(coords[count].second, j);
|
|
next->setValue(i, j, b);
|
|
}
|
|
else {
|
|
// We need to copy a, if it has already been stolen from
|
|
// its original buffer, which we know by looking at the
|
|
// valueCount there.
|
|
auto vCount = _buffer[coords[count].first]->valueCount(a);
|
|
|
|
if (vCount == 0) {
|
|
// Was already stolen for another block
|
|
AqlValue b = a.clone();
|
|
try {
|
|
TRI_IF_FAILURE("SortBlock::doSortingCache") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
cache.emplace(make_pair(a, b));
|
|
}
|
|
catch (...) {
|
|
b.destroy();
|
|
throw;
|
|
}
|
|
|
|
try {
|
|
TRI_IF_FAILURE("SortBlock::doSortingNext1") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
next->setValue(i, j, b);
|
|
}
|
|
catch (...) {
|
|
b.destroy();
|
|
cache.erase(a);
|
|
throw;
|
|
}
|
|
// It does not matter whether the following works or not,
|
|
// since the original block keeps its responsibility
|
|
// for a:
|
|
_buffer[coords[count].first]->eraseValue(coords[count].second, j);
|
|
}
|
|
else {
|
|
// Here we are the first to want to inherit a, so we
|
|
// steal it:
|
|
_buffer[coords[count].first]->steal(a);
|
|
// If this has worked, responsibility is now with the
|
|
// new block or indeed with us!
|
|
try {
|
|
TRI_IF_FAILURE("SortBlock::doSortingNext2") {
|
|
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
|
|
}
|
|
next->setValue(i, j, a);
|
|
}
|
|
catch (...) {
|
|
a.destroy();
|
|
throw;
|
|
}
|
|
_buffer[coords[count].first]->eraseValue(coords[count].second, j);
|
|
// This might throw as well, however, the responsibility
|
|
// is already with the new block.
|
|
|
|
// If the following does not work, we will create a
|
|
// few unnecessary copies, but this does not matter:
|
|
cache.emplace(make_pair(a, a));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
count++;
|
|
}
|
|
cache.clear();
|
|
for (RegisterId j = 0; j < nrregs; j++) {
|
|
next->setDocumentCollection(j, _buffer.front()->getDocumentCollection(j));
|
|
}
|
|
}
|
|
}
|
|
catch (...) {
|
|
for (auto& x : newbuffer) {
|
|
delete x;
|
|
}
|
|
throw;
|
|
}
|
|
_buffer.swap(newbuffer); // does not throw since allocators
|
|
// are the same
|
|
for (auto& x : newbuffer) {
|
|
delete x;
|
|
}
|
|
}
|
|
|
|
// -----------------------------------------------------------------------------
|
|
// --SECTION-- class SortBlock::OurLessThan
|
|
// -----------------------------------------------------------------------------
|
|
|
|
bool SortBlock::OurLessThan::operator() (std::pair<size_t, size_t> const& a,
|
|
std::pair<size_t, size_t> const& b) {
|
|
|
|
size_t i = 0;
|
|
for (auto const& reg : _sortRegisters) {
|
|
|
|
int cmp = AqlValue::Compare(
|
|
_trx,
|
|
_buffer[a.first]->getValueReference(a.second, reg.first),
|
|
_colls[i],
|
|
_buffer[b.first]->getValueReference(b.second, reg.first),
|
|
_colls[i],
|
|
true
|
|
);
|
|
|
|
if (cmp < 0) {
|
|
return reg.second;
|
|
}
|
|
else if (cmp > 0) {
|
|
return ! reg.second;
|
|
}
|
|
i++;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
// Local Variables:
|
|
// mode: outline-minor
|
|
// outline-regexp: "^\\(/// @brief\\|/// {@inheritDoc}\\|/// @addtogroup\\|// --SECTION--\\|/// @\\}\\)"
|
|
// End:
|