1
0
Fork 0

Added a general batch inserter for AssocUnique. Not used in any index yet

This commit is contained in:
Michael Hackstein 2015-09-01 16:53:31 +02:00
parent 1278d8d24c
commit 9001e88a89
2 changed files with 192 additions and 23 deletions

View File

@ -25,7 +25,7 @@
/// @author Dr. Frank Celler
/// @author Martin Schoenert
/// @author Max Neunhoeffer
/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2014-2015, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2006-2014, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////

View File

@ -24,7 +24,8 @@
///
/// @author Dr. Frank Celler
/// @author Martin Schoenert
/// @author Copyright 2014, ArangoDB GmbH, Cologne, Germany
/// @author Michael hackstein
/// @author Copyright 2014-2015, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2006-2013, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
@ -35,6 +36,7 @@
#include "Basics/gcd.h"
#include "Basics/JsonHelper.h"
#include "Basics/logging.h"
#include "Basics/MutexLocker.h"
#include "Basics/random.h"
namespace triagens {
@ -242,10 +244,10 @@ namespace triagens {
/// @brief check a resize of the hash array
////////////////////////////////////////////////////////////////////////////////
bool checkResize (Bucket& b) {
if (2 * b._nrAlloc < 3 * b._nrUsed) {
bool checkResize (Bucket& b, uint64_t expected) {
if (2 * (b._nrAlloc + expected) < 3 * b._nrUsed) {
try {
resizeInternal(b, 2 * b._nrAlloc + 1, false);
resizeInternal(b, 2 * (b._nrAlloc + expected) + 1, false);
}
catch (...) {
return false;
@ -272,6 +274,41 @@ namespace triagens {
return nullptr;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief Insert a document into the given bucket
/// This does not resize and expects to have enough space
////////////////////////////////////////////////////////////////////////////////
int doInsert (Key const* key,
Element* element,
Bucket& b,
uint64_t hash) {
uint64_t const n = b._nrAlloc;
uint64_t i = hash % n;
uint64_t k = i;
for (; i < n && b._table[i] != nullptr &&
! _isEqualKeyElement(key, hash, b._table[i]); ++i);
if (i == n) {
for (i = 0; i < k && b._table[i] != nullptr &&
! _isEqualKeyElement(key, hash, b._table[i]); ++i);
}
Element* arrayElement = b._table[i];
if (arrayElement != nullptr) {
return TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED;
}
b._table[i] = static_cast<Element*>(element);
TRI_ASSERT(b._table[i] != nullptr);
b._nrUsed++;
return TRI_ERROR_NO_ERROR;
}
// -----------------------------------------------------------------------------
// --SECTION-- public functions
// -----------------------------------------------------------------------------
@ -414,35 +451,167 @@ namespace triagens {
bool isRollback) {
uint64_t hash = _hashKey(key);
uint64_t i = hash;
Bucket& b = _buckets[i & _bucketsMask];
Bucket& b = _buckets[hash & _bucketsMask];
if (! checkResize(b)) {
if (! checkResize(b, 0)) {
return TRI_ERROR_OUT_OF_MEMORY;
}
uint64_t const n = b._nrAlloc;
i = i % n;
uint64_t k = i;
return doInsert(key, element, b, hash);
}
for (; i < n && b._table[i] != nullptr &&
! _isEqualKeyElement(key, hash, b._table[i]); ++i);
if (i == n) {
for (i = 0; i < k && b._table[i] != nullptr &&
! _isEqualKeyElement(key, hash, b._table[i]); ++i);
////////////////////////////////////////////////////////////////////////////////
/// @brief adds multiple elements to the array
////////////////////////////////////////////////////////////////////////////////
int batchInsert (std::vector<std::pair<Key const*, Element const*>> const* data,
size_t numThreads) {
std::atomic<int> res(TRI_ERROR_NO_ERROR);
std::vector<std::pair<Key const*, Element const*>> const& elements = *(data);
if (elements.size() < numThreads) {
numThreads = elements.size();
}
if (numThreads > _buckets.size()) {
numThreads = _buckets.size();
}
Element* arrayElement = b._table[i];
size_t const chunkSize = elements.size() / numThreads;
if (arrayElement != nullptr) {
return TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED;
typedef std::vector<std::pair<std::pair<Key const*, Element const*>, uint64_t>> DocumentsPerBucket;
triagens::basics::Mutex bucketMapLocker;
std::unordered_map<uint64_t, std::vector<DocumentsPerBucket>> allBuckets;
// partition the work into some buckets
{
auto partitioner = [&] (size_t lower, size_t upper) -> void {
try {
std::unordered_map<uint64_t, DocumentsPerBucket> partitions;
for (size_t i = lower; i < upper; ++i) {
uint64_t hash = _hashKey(elements[i].first);
auto bucketId = hash & _bucketsMask;
auto it = partitions.find(bucketId);
if (it == partitions.end()) {
it = partitions.emplace(bucketId, DocumentsPerBucket()).first;
}
(*it).second.emplace_back(std::make_pair(elements[i], hash));
}
// transfer ownership to the central map
MUTEX_LOCKER(bucketMapLocker);
for (auto& it : partitions) {
auto it2 = allBuckets.find(it.first);
if (it2 == allBuckets.end()) {
it2 = allBuckets.emplace(it.first, std::vector<DocumentsPerBucket>()).first;
}
(*it2).second.emplace_back(std::move(it.second));
}
}
catch (...) {
res = TRI_ERROR_INTERNAL;
}
};
std::vector<std::thread> threads;
threads.reserve(numThreads);
try {
for (size_t i = 0; i < numThreads; ++i) {
size_t lower = i * chunkSize;
size_t upper = (i + 1) * chunkSize;
if (i + 1 == numThreads) {
// last chunk. account for potential rounding errors
upper = elements.size();
}
else if (upper > elements.size()) {
upper = elements.size();
}
threads.emplace_back(std::thread(partitioner, lower, upper));
}
}
catch (...) {
res = TRI_ERROR_INTERNAL;
}
for (size_t i = 0; i < threads.size(); ++i) {
// must join threads, otherwise the program will crash
threads[i].join();
}
}
b._table[i] = static_cast<Element*>(element);
TRI_ASSERT(b._table[i] != nullptr);
b._nrUsed++;
if (res.load() != TRI_ERROR_NO_ERROR) {
return res.load();
}
return TRI_ERROR_NO_ERROR;
// now the data is partitioned...
// now insert the bucket data in parallel
{
auto inserter = [&] (size_t chunk) -> void {
try {
for (auto const& it : allBuckets) {
uint64_t bucketId = it.first;
if (bucketId % numThreads != chunk) {
// we're not responsible for this bucket!
continue;
}
// we're responsible for this bucket!
Bucket& b = _buckets[bucketId];
uint64_t expected = 0;
for (auto const& it2 : it.second) {
expected += it2.size();
}
if (! checkResize(b, expected)) {
res = TRI_ERROR_OUT_OF_MEMORY;
return;
}
for (auto const& it2 : it.second) {
for (auto const& it3 : it2) {
doInsert(it3.first.first, const_cast<Element*>(it3.first.second), b, it3.second);
}
}
}
}
catch (...) {
res = TRI_ERROR_INTERNAL;
}
};
std::vector<std::thread> threads;
threads.reserve(numThreads);
try {
for (size_t i = 0; i < numThreads; ++i) {
threads.emplace_back(std::thread(inserter, i));
}
}
catch (...) {
res = TRI_ERROR_INTERNAL;
}
for (size_t i = 0; i < threads.size(); ++i) {
// must join threads, otherwise the program will crash
threads[i].join();
}
}
return res.load();
}
////////////////////////////////////////////////////////////////////////////////