diff --git a/lib/Basics/AssocMulti.h b/lib/Basics/AssocMulti.h index 2910c796b9..966dc0a0c0 100644 --- a/lib/Basics/AssocMulti.h +++ b/lib/Basics/AssocMulti.h @@ -25,7 +25,7 @@ /// @author Dr. Frank Celler /// @author Martin Schoenert /// @author Max Neunhoeffer -/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany +/// @author Copyright 2014-2015, ArangoDB GmbH, Cologne, Germany /// @author Copyright 2006-2014, triAGENS GmbH, Cologne, Germany //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/Basics/AssocUnique.h b/lib/Basics/AssocUnique.h index 926169fd9c..4f830ecad0 100644 --- a/lib/Basics/AssocUnique.h +++ b/lib/Basics/AssocUnique.h @@ -24,7 +24,8 @@ /// /// @author Dr. Frank Celler /// @author Martin Schoenert -/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany +/// @author Michael hackstein +/// @author Copyright 2014-2015, ArangoDB GmbH, Cologne, Germany /// @author Copyright 2006-2013, triAGENS GmbH, Cologne, Germany //////////////////////////////////////////////////////////////////////////////// @@ -35,6 +36,7 @@ #include "Basics/gcd.h" #include "Basics/JsonHelper.h" #include "Basics/logging.h" +#include "Basics/MutexLocker.h" #include "Basics/random.h" namespace triagens { @@ -242,10 +244,10 @@ namespace triagens { /// @brief check a resize of the hash array //////////////////////////////////////////////////////////////////////////////// - bool checkResize (Bucket& b) { - if (2 * b._nrAlloc < 3 * b._nrUsed) { + bool checkResize (Bucket& b, uint64_t expected) { + if (2 * (b._nrAlloc + expected) < 3 * b._nrUsed) { try { - resizeInternal(b, 2 * b._nrAlloc + 1, false); + resizeInternal(b, 2 * (b._nrAlloc + expected) + 1, false); } catch (...) { return false; @@ -272,6 +274,41 @@ namespace triagens { return nullptr; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief Insert a document into the given bucket +/// This does not resize and expects to have enough space +//////////////////////////////////////////////////////////////////////////////// + + int doInsert (Key const* key, + Element* element, + Bucket& b, + uint64_t hash) { + + uint64_t const n = b._nrAlloc; + uint64_t i = hash % n; + uint64_t k = i; + + for (; i < n && b._table[i] != nullptr && + ! _isEqualKeyElement(key, hash, b._table[i]); ++i); + if (i == n) { + for (i = 0; i < k && b._table[i] != nullptr && + ! _isEqualKeyElement(key, hash, b._table[i]); ++i); + } + + Element* arrayElement = b._table[i]; + + if (arrayElement != nullptr) { + return TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED; + } + + b._table[i] = static_cast(element); + TRI_ASSERT(b._table[i] != nullptr); + b._nrUsed++; + + return TRI_ERROR_NO_ERROR; + } + + // ----------------------------------------------------------------------------- // --SECTION-- public functions // ----------------------------------------------------------------------------- @@ -414,35 +451,167 @@ namespace triagens { bool isRollback) { uint64_t hash = _hashKey(key); - uint64_t i = hash; - Bucket& b = _buckets[i & _bucketsMask]; + Bucket& b = _buckets[hash & _bucketsMask]; - if (! checkResize(b)) { + if (! checkResize(b, 0)) { return TRI_ERROR_OUT_OF_MEMORY; } - uint64_t const n = b._nrAlloc; - i = i % n; - uint64_t k = i; + return doInsert(key, element, b, hash); + } - for (; i < n && b._table[i] != nullptr && - ! _isEqualKeyElement(key, hash, b._table[i]); ++i); - if (i == n) { - for (i = 0; i < k && b._table[i] != nullptr && - ! _isEqualKeyElement(key, hash, b._table[i]); ++i); +//////////////////////////////////////////////////////////////////////////////// +/// @brief adds multiple elements to the array +//////////////////////////////////////////////////////////////////////////////// + + int batchInsert (std::vector> const* data, + size_t numThreads) { + + std::atomic res(TRI_ERROR_NO_ERROR); + std::vector> const& elements = *(data); + + if (elements.size() < numThreads) { + numThreads = elements.size(); + } + if (numThreads > _buckets.size()) { + numThreads = _buckets.size(); } - Element* arrayElement = b._table[i]; + size_t const chunkSize = elements.size() / numThreads; - if (arrayElement != nullptr) { - return TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED; + typedef std::vector, uint64_t>> DocumentsPerBucket; + triagens::basics::Mutex bucketMapLocker; + + std::unordered_map> allBuckets; + + // partition the work into some buckets + { + auto partitioner = [&] (size_t lower, size_t upper) -> void { + try { + std::unordered_map partitions; + + for (size_t i = lower; i < upper; ++i) { + uint64_t hash = _hashKey(elements[i].first); + auto bucketId = hash & _bucketsMask; + + auto it = partitions.find(bucketId); + + if (it == partitions.end()) { + it = partitions.emplace(bucketId, DocumentsPerBucket()).first; + } + + (*it).second.emplace_back(std::make_pair(elements[i], hash)); + } + + // transfer ownership to the central map + MUTEX_LOCKER(bucketMapLocker); + + for (auto& it : partitions) { + auto it2 = allBuckets.find(it.first); + + if (it2 == allBuckets.end()) { + it2 = allBuckets.emplace(it.first, std::vector()).first; + } + + (*it2).second.emplace_back(std::move(it.second)); + } + } + catch (...) { + res = TRI_ERROR_INTERNAL; + } + }; + + std::vector threads; + threads.reserve(numThreads); + + try { + for (size_t i = 0; i < numThreads; ++i) { + size_t lower = i * chunkSize; + size_t upper = (i + 1) * chunkSize; + + if (i + 1 == numThreads) { + // last chunk. account for potential rounding errors + upper = elements.size(); + } + else if (upper > elements.size()) { + upper = elements.size(); + } + + threads.emplace_back(std::thread(partitioner, lower, upper)); + } + } + catch (...) { + res = TRI_ERROR_INTERNAL; + } + + for (size_t i = 0; i < threads.size(); ++i) { + // must join threads, otherwise the program will crash + threads[i].join(); + } } - b._table[i] = static_cast(element); - TRI_ASSERT(b._table[i] != nullptr); - b._nrUsed++; + if (res.load() != TRI_ERROR_NO_ERROR) { + return res.load(); + } - return TRI_ERROR_NO_ERROR; + // now the data is partitioned... + + // now insert the bucket data in parallel + { + auto inserter = [&] (size_t chunk) -> void { + try { + for (auto const& it : allBuckets) { + uint64_t bucketId = it.first; + + if (bucketId % numThreads != chunk) { + // we're not responsible for this bucket! + continue; + } + + // we're responsible for this bucket! + Bucket& b = _buckets[bucketId]; + uint64_t expected = 0; + + for (auto const& it2 : it.second) { + expected += it2.size(); + } + + if (! checkResize(b, expected)) { + res = TRI_ERROR_OUT_OF_MEMORY; + return; + } + + for (auto const& it2 : it.second) { + for (auto const& it3 : it2) { + doInsert(it3.first.first, const_cast(it3.first.second), b, it3.second); + } + } + } + } + catch (...) { + res = TRI_ERROR_INTERNAL; + } + }; + + std::vector threads; + threads.reserve(numThreads); + + try { + for (size_t i = 0; i < numThreads; ++i) { + threads.emplace_back(std::thread(inserter, i)); + } + } + catch (...) { + res = TRI_ERROR_INTERNAL; + } + + for (size_t i = 0; i < threads.size(); ++i) { + // must join threads, otherwise the program will crash + threads[i].join(); + } + } + + return res.load(); } ////////////////////////////////////////////////////////////////////////////////