From 7344a036237d3a9ca1bbf49dc33928b45058e851 Mon Sep 17 00:00:00 2001 From: jsteemann Date: Tue, 25 Apr 2017 15:47:11 +0200 Subject: [PATCH] fix crashes --- arangod/Indexes/Index.cpp | 2 +- arangod/Indexes/Index.h | 2 +- arangod/MMFiles/MMFilesCollection.cpp | 31 +++++++++++----------- arangod/MMFiles/MMFilesCollection.h | 2 +- arangod/MMFiles/MMFilesEdgeIndex.cpp | 2 +- arangod/MMFiles/MMFilesEdgeIndex.h | 2 +- arangod/MMFiles/MMFilesHashIndex.cpp | 6 ++--- arangod/MMFiles/MMFilesHashIndex.h | 6 ++--- arangod/RocksDBEngine/RocksDBEdgeIndex.cpp | 2 +- arangod/RocksDBEngine/RocksDBEdgeIndex.h | 2 +- lib/Basics/AssocMulti.h | 2 +- lib/Basics/AssocMultiHelpers.h | 4 +-- lib/Basics/AssocUnique.h | 2 +- lib/Basics/AssocUniqueHelpers.h | 4 +-- lib/Basics/LocalTaskQueue.cpp | 4 +-- lib/Basics/LocalTaskQueue.h | 8 +++--- 16 files changed, 41 insertions(+), 40 deletions(-) diff --git a/arangod/Indexes/Index.cpp b/arangod/Indexes/Index.cpp index b34f480615..98381953df 100644 --- a/arangod/Indexes/Index.cpp +++ b/arangod/Indexes/Index.cpp @@ -514,7 +514,7 @@ void Index::batchInsert( transaction::Methods* trx, std::vector> const& documents, - arangodb::basics::LocalTaskQueue* queue) { + std::shared_ptr queue) { for (auto const& it : documents) { int status = insert(trx, it.first, it.second, false); if (status != TRI_ERROR_NO_ERROR) { diff --git a/arangod/Indexes/Index.h b/arangod/Indexes/Index.h index 9481f512ce..831dc7fef1 100644 --- a/arangod/Indexes/Index.h +++ b/arangod/Indexes/Index.h @@ -250,7 +250,7 @@ class Index { virtual void batchInsert( transaction::Methods*, std::vector> const&, - arangodb::basics::LocalTaskQueue* queue = nullptr); + std::shared_ptr queue); virtual int unload() = 0; diff --git a/arangod/MMFiles/MMFilesCollection.cpp b/arangod/MMFiles/MMFilesCollection.cpp index 9ea7272eee..484b4708cb 100644 --- a/arangod/MMFiles/MMFilesCollection.cpp +++ b/arangod/MMFiles/MMFilesCollection.cpp @@ -73,7 +73,7 @@ namespace { class MMFilesIndexFillerTask : public basics::LocalTask { public: MMFilesIndexFillerTask( - basics::LocalTaskQueue* queue, transaction::Methods* trx, Index* idx, + std::shared_ptr queue, transaction::Methods* trx, Index* idx, std::vector> const& documents) : LocalTask(queue), _trx(trx), _idx(idx), _documents(documents) {} @@ -1464,7 +1464,7 @@ bool MMFilesCollection::openIndex(VPackSlice const& description, /// @brief initializes an index with a set of existing documents void MMFilesCollection::fillIndex( - arangodb::basics::LocalTaskQueue* queue, transaction::Methods* trx, + std::shared_ptr queue, transaction::Methods* trx, arangodb::Index* idx, std::vector> const& documents, bool skipPersistent) { @@ -1554,12 +1554,13 @@ int MMFilesCollection::fillIndexes( TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr); auto ioService = SchedulerFeature::SCHEDULER->ioService(); TRI_ASSERT(ioService != nullptr); - arangodb::basics::LocalTaskQueue queue(ioService); - + PerformanceLogScope logScope( std::string("fill-indexes-document-collection { collection: ") + _logicalCollection->vocbase()->name() + "/" + _logicalCollection->name() + " }, indexes: " + std::to_string(n - 1)); + + auto queue = std::make_shared(ioService); try { TRI_ASSERT(!ServerState::instance()->isCoordinator()); @@ -1594,12 +1595,12 @@ int MMFilesCollection::fillIndexes( if (idx->type() == Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX) { continue; } - fillIndex(&queue, trx, idx.get(), documents, skipPersistent); + fillIndex(queue, trx, idx.get(), documents, skipPersistent); } - queue.dispatchAndWait(); + queue->dispatchAndWait(); - if (queue.status() != TRI_ERROR_NO_ERROR) { + if (queue->status() != TRI_ERROR_NO_ERROR) { rollbackAll(); rolledBack = true; } @@ -1626,7 +1627,7 @@ int MMFilesCollection::fillIndexes( if (documents.size() == blockSize) { // now actually fill the secondary indexes insertInAllIndexes(); - if (queue.status() != TRI_ERROR_NO_ERROR) { + if (queue->status() != TRI_ERROR_NO_ERROR) { break; } documents.clear(); @@ -1636,33 +1637,33 @@ int MMFilesCollection::fillIndexes( } // process the remainder of the documents - if (queue.status() == TRI_ERROR_NO_ERROR && !documents.empty()) { + if (queue->status() == TRI_ERROR_NO_ERROR && !documents.empty()) { insertInAllIndexes(); } } catch (arangodb::basics::Exception const& ex) { - queue.setStatus(ex.code()); + queue->setStatus(ex.code()); LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "caught exception while filling indexes: " << ex.what(); } catch (std::bad_alloc const&) { - queue.setStatus(TRI_ERROR_OUT_OF_MEMORY); + queue->setStatus(TRI_ERROR_OUT_OF_MEMORY); } catch (std::exception const& ex) { LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "caught exception while filling indexes: " << ex.what(); - queue.setStatus(TRI_ERROR_INTERNAL); + queue->setStatus(TRI_ERROR_INTERNAL); } catch (...) { LOG_TOPIC(WARN, arangodb::Logger::FIXME) << "caught unknown exception while filling indexes"; - queue.setStatus(TRI_ERROR_INTERNAL); + queue->setStatus(TRI_ERROR_INTERNAL); } - if (queue.status() != TRI_ERROR_NO_ERROR && !rolledBack) { + if (queue->status() != TRI_ERROR_NO_ERROR && !rolledBack) { try { rollbackAll(); } catch (...) { } } - return queue.status(); + return queue->status(); } /// @brief opens an existing collection diff --git a/arangod/MMFiles/MMFilesCollection.h b/arangod/MMFiles/MMFilesCollection.h index dd186b0796..6d0d38b998 100644 --- a/arangod/MMFiles/MMFilesCollection.h +++ b/arangod/MMFiles/MMFilesCollection.h @@ -398,7 +398,7 @@ class MMFilesCollection final : public PhysicalCollection { bool openIndex(VPackSlice const& description, transaction::Methods* trx); /// @brief initializes an index with all existing documents - void fillIndex(basics::LocalTaskQueue*, transaction::Methods*, Index*, + void fillIndex(std::shared_ptr, transaction::Methods*, Index*, std::vector> const&, bool); diff --git a/arangod/MMFiles/MMFilesEdgeIndex.cpp b/arangod/MMFiles/MMFilesEdgeIndex.cpp index dd70f6e5cd..c9c15b485a 100644 --- a/arangod/MMFiles/MMFilesEdgeIndex.cpp +++ b/arangod/MMFiles/MMFilesEdgeIndex.cpp @@ -329,7 +329,7 @@ int MMFilesEdgeIndex::remove(transaction::Methods* trx, void MMFilesEdgeIndex::batchInsert( transaction::Methods* trx, std::vector> const& documents, - arangodb::basics::LocalTaskQueue* queue) { + std::shared_ptr queue) { if (documents.empty()) { return; } diff --git a/arangod/MMFiles/MMFilesEdgeIndex.h b/arangod/MMFiles/MMFilesEdgeIndex.h index e42a84a22e..86bfdd6883 100644 --- a/arangod/MMFiles/MMFilesEdgeIndex.h +++ b/arangod/MMFiles/MMFilesEdgeIndex.h @@ -111,7 +111,7 @@ class MMFilesEdgeIndex final : public Index { void batchInsert(transaction::Methods*, std::vector> const&, - arangodb::basics::LocalTaskQueue*) override; + std::shared_ptr) override; int unload() override; diff --git a/arangod/MMFiles/MMFilesHashIndex.cpp b/arangod/MMFiles/MMFilesHashIndex.cpp index 74c3a2c2da..38a1490481 100644 --- a/arangod/MMFiles/MMFilesHashIndex.cpp +++ b/arangod/MMFiles/MMFilesHashIndex.cpp @@ -644,7 +644,7 @@ int MMFilesHashIndex::remove(transaction::Methods* trx, void MMFilesHashIndex::batchInsert( transaction::Methods* trx, std::vector> const& documents, - arangodb::basics::LocalTaskQueue* queue) { + std::shared_ptr queue) { TRI_ASSERT(queue != nullptr); if (_unique) { batchInsertUnique(trx, documents, queue); @@ -760,7 +760,7 @@ int MMFilesHashIndex::insertUnique(transaction::Methods* trx, void MMFilesHashIndex::batchInsertUnique( transaction::Methods* trx, std::vector> const& documents, - arangodb::basics::LocalTaskQueue* queue) { + std::shared_ptr queue) { TRI_ASSERT(queue != nullptr); std::shared_ptr> elements; elements.reset(new std::vector()); @@ -880,7 +880,7 @@ int MMFilesHashIndex::insertMulti(transaction::Methods* trx, void MMFilesHashIndex::batchInsertMulti( transaction::Methods* trx, std::vector> const& documents, - arangodb::basics::LocalTaskQueue* queue) { + std::shared_ptr queue) { TRI_ASSERT(queue != nullptr); std::shared_ptr> elements; elements.reset(new std::vector()); diff --git a/arangod/MMFiles/MMFilesHashIndex.h b/arangod/MMFiles/MMFilesHashIndex.h index 95a02cf287..73561faccf 100644 --- a/arangod/MMFiles/MMFilesHashIndex.h +++ b/arangod/MMFiles/MMFilesHashIndex.h @@ -173,7 +173,7 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex { void batchInsert( transaction::Methods*, std::vector> const&, - arangodb::basics::LocalTaskQueue* queue = nullptr) override; + std::shared_ptr queue) override; int unload() override; @@ -205,7 +205,7 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex { void batchInsertUnique( transaction::Methods*, std::vector> const&, - arangodb::basics::LocalTaskQueue* queue = nullptr); + std::shared_ptr queue); int insertMulti(transaction::Methods*, TRI_voc_rid_t, arangodb::velocypack::Slice const&, bool isRollback); @@ -213,7 +213,7 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex { void batchInsertMulti( transaction::Methods*, std::vector> const&, - arangodb::basics::LocalTaskQueue* queue = nullptr); + std::shared_ptr queue); int removeUniqueElement(transaction::Methods*, MMFilesHashIndexElement*, bool); diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp index df9e6c8213..c1ade6cf77 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.cpp @@ -251,7 +251,7 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx, void RocksDBEdgeIndex::batchInsert( transaction::Methods* trx, std::vector> const& documents, - arangodb::basics::LocalTaskQueue* queue) { + std::shared_ptr queue) { // acquire rocksdb transaction RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx); rocksdb::Transaction* rtrx = state->rocksTransaction(); diff --git a/arangod/RocksDBEngine/RocksDBEdgeIndex.h b/arangod/RocksDBEngine/RocksDBEdgeIndex.h index af771a60a0..180def9ed1 100644 --- a/arangod/RocksDBEngine/RocksDBEdgeIndex.h +++ b/arangod/RocksDBEngine/RocksDBEdgeIndex.h @@ -111,7 +111,7 @@ class RocksDBEdgeIndex final : public RocksDBIndex { void batchInsert( transaction::Methods*, std::vector> const&, - arangodb::basics::LocalTaskQueue* queue = nullptr) override; + std::shared_ptr queue) override; int drop() override; diff --git a/lib/Basics/AssocMulti.h b/lib/Basics/AssocMulti.h index 061b11f2ee..900fb24889 100644 --- a/lib/Basics/AssocMulti.h +++ b/lib/Basics/AssocMulti.h @@ -292,7 +292,7 @@ class AssocMulti { void batchInsert(std::function const& contextCreator, std::function const& contextDestroyer, std::shared_ptr const> data, - LocalTaskQueue* queue) { + std::shared_ptr queue) { if (data->empty()) { // nothing to do return; diff --git a/lib/Basics/AssocMultiHelpers.h b/lib/Basics/AssocMultiHelpers.h index 1548ebadec..600c11f203 100644 --- a/lib/Basics/AssocMultiHelpers.h +++ b/lib/Basics/AssocMultiHelpers.h @@ -94,7 +94,7 @@ class MultiInserterTask : public LocalTask { public: MultiInserterTask( - LocalTaskQueue* queue, std::function contextDestroyer, + std::shared_ptr queue, std::function contextDestroyer, std::vector* buckets, std::function @@ -168,7 +168,7 @@ class MultiPartitionerTask : public LocalTask { public: MultiPartitionerTask( - LocalTaskQueue* queue, + std::shared_ptr queue, std::function hashElement, std::function const& contextDestroyer, std::shared_ptr const> data, size_t lower, diff --git a/lib/Basics/AssocUnique.h b/lib/Basics/AssocUnique.h index 8e686fe4da..c4d6111d99 100644 --- a/lib/Basics/AssocUnique.h +++ b/lib/Basics/AssocUnique.h @@ -553,7 +553,7 @@ class AssocUnique { void batchInsert(std::function const& contextCreator, std::function const& contextDestroyer, std::shared_ptr const> data, - arangodb::basics::LocalTaskQueue* queue) { + std::shared_ptr queue) { TRI_ASSERT(queue != nullptr); if (data->empty()) { // nothing to do diff --git a/lib/Basics/AssocUniqueHelpers.h b/lib/Basics/AssocUniqueHelpers.h index b51e0cb40a..25148f1caf 100644 --- a/lib/Basics/AssocUniqueHelpers.h +++ b/lib/Basics/AssocUniqueHelpers.h @@ -70,7 +70,7 @@ class UniqueInserterTask : public LocalTask { public: UniqueInserterTask( - LocalTaskQueue* queue, std::function contextDestroyer, + std::shared_ptr queue, std::function contextDestroyer, std::vector* buckets, std::function doInsert, std::function checkResize, size_t i, @@ -140,7 +140,7 @@ class UniquePartitionerTask : public LocalTask { public: UniquePartitionerTask( - LocalTaskQueue* queue, + std::shared_ptr queue, std::function hashElement, std::function const& contextDestroyer, std::shared_ptr const> data, size_t lower, diff --git a/lib/Basics/LocalTaskQueue.cpp b/lib/Basics/LocalTaskQueue.cpp index d11dae928f..214e5b86d6 100644 --- a/lib/Basics/LocalTaskQueue.cpp +++ b/lib/Basics/LocalTaskQueue.cpp @@ -34,7 +34,7 @@ using namespace arangodb::basics; /// @brief create a task tied to the specified queue //////////////////////////////////////////////////////////////////////////////// -LocalTask::LocalTask(LocalTaskQueue* queue) : _queue(queue) {} +LocalTask::LocalTask(std::shared_ptr queue) : _queue(queue) {} //////////////////////////////////////////////////////////////////////////////// /// @brief dispatch this task to the underlying io_service @@ -58,7 +58,7 @@ void LocalTask::dispatch() { /// @brief create a callback task tied to the specified queue //////////////////////////////////////////////////////////////////////////////// -LocalCallbackTask::LocalCallbackTask(LocalTaskQueue* queue, +LocalCallbackTask::LocalCallbackTask(std::shared_ptr queue, std::function cb) : _queue(queue), _cb(cb) {} diff --git a/lib/Basics/LocalTaskQueue.h b/lib/Basics/LocalTaskQueue.h index 92de02e939..d48ae2cc1d 100644 --- a/lib/Basics/LocalTaskQueue.h +++ b/lib/Basics/LocalTaskQueue.h @@ -43,7 +43,7 @@ class LocalTask : public std::enable_shared_from_this { LocalTask(LocalTask const&) = delete; LocalTask& operator=(LocalTask const&) = delete; - explicit LocalTask(LocalTaskQueue* queue); + explicit LocalTask(std::shared_ptr queue); virtual ~LocalTask() {} virtual void run() = 0; @@ -54,7 +54,7 @@ class LocalTask : public std::enable_shared_from_this { /// @brief the underlying queue ////////////////////////////////////////////////////////////////////////////// - LocalTaskQueue* _queue; + std::shared_ptr _queue; }; class LocalCallbackTask @@ -64,7 +64,7 @@ class LocalCallbackTask LocalCallbackTask(LocalCallbackTask const&) = delete; LocalCallbackTask& operator=(LocalCallbackTask const&) = delete; - LocalCallbackTask(LocalTaskQueue* queue, std::function cb); + LocalCallbackTask(std::shared_ptr queue, std::function cb); virtual ~LocalCallbackTask() {} virtual void run(); @@ -75,7 +75,7 @@ class LocalCallbackTask /// @brief the underlying queue ////////////////////////////////////////////////////////////////////////////// - LocalTaskQueue* _queue; + std::shared_ptr _queue; ////////////////////////////////////////////////////////////////////////////// /// @brief the callback executed by run() (any exceptions will be caught and