//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Dr. Frank Celler /// @author Martin Schoenert /// @author Michael Hackstein /// @author Daniel H. Larkin //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGODB_BASICS_ASSOC_UNIQUE_HELPERS_H #define ARANGODB_BASICS_ASSOC_UNIQUE_HELPERS_H 1 #include "Basics/Common.h" #include "Basics/IndexBucket.h" #include "Basics/LocalTaskQueue.h" #include "Basics/MutexLocker.h" namespace arangodb { namespace basics { struct BucketPosition { size_t bucketId; uint64_t position; BucketPosition() : bucketId(SIZE_MAX), position(0) {} void reset() { bucketId = SIZE_MAX - 1; position = 0; } bool operator==(BucketPosition const& other) const { return position == other.position && bucketId == other.bucketId; } }; template class UniqueInserterTask final : public LocalTask { private: typedef arangodb::basics::IndexBucket Bucket; typedef std::vector> DocumentsPerBucket; std::function _contextDestroyer; std::vector* _buckets; std::function _doInsert; std::function _checkResize; size_t _i; void* _userData; std::shared_ptr>> _allBuckets; public: UniqueInserterTask(std::shared_ptr queue, std::function contextDestroyer, std::vector* buckets, std::function doInsert, std::function checkResize, size_t i, void* userData, std::shared_ptr>> allBuckets) : LocalTask(queue), _contextDestroyer(contextDestroyer), _buckets(buckets), _doInsert(doInsert), _checkResize(checkResize), _i(i), _userData(userData), _allBuckets(allBuckets) {} void run() override { // actually insert them try { Bucket& b = (*_buckets)[static_cast(_i)]; for (auto const& it : (*_allBuckets)[_i]) { uint64_t expected = it.size(); if (!_checkResize(_userData, b, expected)) { _queue->setStatus(TRI_ERROR_OUT_OF_MEMORY); _queue->join(); return; } for (auto const& it2 : it) { int status = _doInsert(_userData, it2.first, b, it2.second); if (status != TRI_ERROR_NO_ERROR) { _queue->setStatus(status); _queue->join(); _contextDestroyer(_userData); return; } } } } catch (...) { _queue->setStatus(TRI_ERROR_INTERNAL); } _contextDestroyer(_userData); _queue->join(); } }; template class UniquePartitionerTask final : public LocalTask { private: typedef UniqueInserterTask Inserter; typedef std::vector> DocumentsPerBucket; std::function _hashElement; std::function _contextDestroyer; std::shared_ptr const> _data; std::vector const* _elements; size_t _lower; size_t _upper; void* _userData; std::shared_ptr>> _bucketFlags; std::shared_ptr> _bucketMapLocker; std::shared_ptr>> _allBuckets; std::shared_ptr>> _inserters; uint64_t _bucketsMask; public: UniquePartitionerTask(std::shared_ptr queue, std::function hashElement, std::function const& contextDestroyer, std::shared_ptr const> data, size_t lower, size_t upper, void* userData, std::shared_ptr>> bucketFlags, std::shared_ptr> bucketMapLocker, std::shared_ptr>> allBuckets, std::shared_ptr>> inserters) : LocalTask(queue), _hashElement(hashElement), _contextDestroyer(contextDestroyer), _data(data), _elements(_data.get()), _lower(lower), _upper(upper), _userData(userData), _bucketFlags(bucketFlags), _bucketMapLocker(bucketMapLocker), _allBuckets(allBuckets), _inserters(inserters), _bucketsMask(_allBuckets->size() - 1) {} void run() override { try { std::vector partitions; partitions.resize(_allBuckets->size()); // initialize to number of buckets for (size_t i = _lower; i < _upper; ++i) { uint64_t hashByKey = _hashElement((*_elements)[i], true); auto bucketId = hashByKey & _bucketsMask; partitions[bucketId].emplace_back((*_elements)[i], hashByKey); } // transfer ownership to the central map for (size_t i = 0; i < partitions.size(); ++i) { MUTEX_LOCKER(mutexLocker, (*_bucketMapLocker)[i]); (*_allBuckets)[i].emplace_back(std::move(partitions[i])); (*_bucketFlags)[i]--; if ((*_bucketFlags)[i].load() == 0) { // queue inserter for bucket i _queue->enqueue((*_inserters)[i]); } } } catch (...) { _queue->setStatus(TRI_ERROR_INTERNAL); } _contextDestroyer(_userData); _queue->join(); } }; } // namespace basics } // namespace arangodb #endif