1
0
Fork 0

Merge branch 'devel' of github.com:arangodb/arangodb into devel

This commit is contained in:
Michael Hackstein 2017-04-25 15:56:48 +02:00
commit 1dda903834
16 changed files with 41 additions and 40 deletions

View File

@ -514,7 +514,7 @@ void Index::batchInsert(
transaction::Methods* trx,
std::vector<std::pair<TRI_voc_rid_t, arangodb::velocypack::Slice>> const&
documents,
arangodb::basics::LocalTaskQueue* queue) {
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) {
for (auto const& it : documents) {
int status = insert(trx, it.first, it.second, false);
if (status != TRI_ERROR_NO_ERROR) {

View File

@ -250,7 +250,7 @@ class Index {
virtual void batchInsert(
transaction::Methods*,
std::vector<std::pair<TRI_voc_rid_t, arangodb::velocypack::Slice>> const&,
arangodb::basics::LocalTaskQueue* queue = nullptr);
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue);
virtual int unload() = 0;

View File

@ -73,7 +73,7 @@ namespace {
class MMFilesIndexFillerTask : public basics::LocalTask {
public:
MMFilesIndexFillerTask(
basics::LocalTaskQueue* queue, transaction::Methods* trx, Index* idx,
std::shared_ptr<basics::LocalTaskQueue> queue, transaction::Methods* trx, Index* idx,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents)
: LocalTask(queue), _trx(trx), _idx(idx), _documents(documents) {}
@ -1464,7 +1464,7 @@ bool MMFilesCollection::openIndex(VPackSlice const& description,
/// @brief initializes an index with a set of existing documents
void MMFilesCollection::fillIndex(
arangodb::basics::LocalTaskQueue* queue, transaction::Methods* trx,
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue, transaction::Methods* trx,
arangodb::Index* idx,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
bool skipPersistent) {
@ -1554,12 +1554,13 @@ int MMFilesCollection::fillIndexes(
TRI_ASSERT(SchedulerFeature::SCHEDULER != nullptr);
auto ioService = SchedulerFeature::SCHEDULER->ioService();
TRI_ASSERT(ioService != nullptr);
arangodb::basics::LocalTaskQueue queue(ioService);
PerformanceLogScope logScope(
std::string("fill-indexes-document-collection { collection: ") +
_logicalCollection->vocbase()->name() + "/" + _logicalCollection->name() +
" }, indexes: " + std::to_string(n - 1));
auto queue = std::make_shared<arangodb::basics::LocalTaskQueue>(ioService);
try {
TRI_ASSERT(!ServerState::instance()->isCoordinator());
@ -1594,12 +1595,12 @@ int MMFilesCollection::fillIndexes(
if (idx->type() == Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX) {
continue;
}
fillIndex(&queue, trx, idx.get(), documents, skipPersistent);
fillIndex(queue, trx, idx.get(), documents, skipPersistent);
}
queue.dispatchAndWait();
queue->dispatchAndWait();
if (queue.status() != TRI_ERROR_NO_ERROR) {
if (queue->status() != TRI_ERROR_NO_ERROR) {
rollbackAll();
rolledBack = true;
}
@ -1626,7 +1627,7 @@ int MMFilesCollection::fillIndexes(
if (documents.size() == blockSize) {
// now actually fill the secondary indexes
insertInAllIndexes();
if (queue.status() != TRI_ERROR_NO_ERROR) {
if (queue->status() != TRI_ERROR_NO_ERROR) {
break;
}
documents.clear();
@ -1636,33 +1637,33 @@ int MMFilesCollection::fillIndexes(
}
// process the remainder of the documents
if (queue.status() == TRI_ERROR_NO_ERROR && !documents.empty()) {
if (queue->status() == TRI_ERROR_NO_ERROR && !documents.empty()) {
insertInAllIndexes();
}
} catch (arangodb::basics::Exception const& ex) {
queue.setStatus(ex.code());
queue->setStatus(ex.code());
LOG_TOPIC(WARN, arangodb::Logger::FIXME)
<< "caught exception while filling indexes: " << ex.what();
} catch (std::bad_alloc const&) {
queue.setStatus(TRI_ERROR_OUT_OF_MEMORY);
queue->setStatus(TRI_ERROR_OUT_OF_MEMORY);
} catch (std::exception const& ex) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME)
<< "caught exception while filling indexes: " << ex.what();
queue.setStatus(TRI_ERROR_INTERNAL);
queue->setStatus(TRI_ERROR_INTERNAL);
} catch (...) {
LOG_TOPIC(WARN, arangodb::Logger::FIXME)
<< "caught unknown exception while filling indexes";
queue.setStatus(TRI_ERROR_INTERNAL);
queue->setStatus(TRI_ERROR_INTERNAL);
}
if (queue.status() != TRI_ERROR_NO_ERROR && !rolledBack) {
if (queue->status() != TRI_ERROR_NO_ERROR && !rolledBack) {
try {
rollbackAll();
} catch (...) {
}
}
return queue.status();
return queue->status();
}
/// @brief opens an existing collection

View File

@ -398,7 +398,7 @@ class MMFilesCollection final : public PhysicalCollection {
bool openIndex(VPackSlice const& description, transaction::Methods* trx);
/// @brief initializes an index with all existing documents
void fillIndex(basics::LocalTaskQueue*, transaction::Methods*, Index*,
void fillIndex(std::shared_ptr<basics::LocalTaskQueue>, transaction::Methods*, Index*,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const&,
bool);

View File

@ -329,7 +329,7 @@ int MMFilesEdgeIndex::remove(transaction::Methods* trx,
void MMFilesEdgeIndex::batchInsert(
transaction::Methods* trx,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
arangodb::basics::LocalTaskQueue* queue) {
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) {
if (documents.empty()) {
return;
}

View File

@ -111,7 +111,7 @@ class MMFilesEdgeIndex final : public Index {
void batchInsert(transaction::Methods*,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const&,
arangodb::basics::LocalTaskQueue*) override;
std::shared_ptr<arangodb::basics::LocalTaskQueue>) override;
int unload() override;

View File

@ -644,7 +644,7 @@ int MMFilesHashIndex::remove(transaction::Methods* trx,
void MMFilesHashIndex::batchInsert(
transaction::Methods* trx,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
arangodb::basics::LocalTaskQueue* queue) {
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) {
TRI_ASSERT(queue != nullptr);
if (_unique) {
batchInsertUnique(trx, documents, queue);
@ -760,7 +760,7 @@ int MMFilesHashIndex::insertUnique(transaction::Methods* trx,
void MMFilesHashIndex::batchInsertUnique(
transaction::Methods* trx,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
arangodb::basics::LocalTaskQueue* queue) {
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) {
TRI_ASSERT(queue != nullptr);
std::shared_ptr<std::vector<MMFilesHashIndexElement*>> elements;
elements.reset(new std::vector<MMFilesHashIndexElement*>());
@ -880,7 +880,7 @@ int MMFilesHashIndex::insertMulti(transaction::Methods* trx,
void MMFilesHashIndex::batchInsertMulti(
transaction::Methods* trx,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
arangodb::basics::LocalTaskQueue* queue) {
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) {
TRI_ASSERT(queue != nullptr);
std::shared_ptr<std::vector<MMFilesHashIndexElement*>> elements;
elements.reset(new std::vector<MMFilesHashIndexElement*>());

View File

@ -173,7 +173,7 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex {
void batchInsert(
transaction::Methods*,
std::vector<std::pair<TRI_voc_rid_t, arangodb::velocypack::Slice>> const&,
arangodb::basics::LocalTaskQueue* queue = nullptr) override;
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) override;
int unload() override;
@ -205,7 +205,7 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex {
void batchInsertUnique(
transaction::Methods*,
std::vector<std::pair<TRI_voc_rid_t, arangodb::velocypack::Slice>> const&,
arangodb::basics::LocalTaskQueue* queue = nullptr);
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue);
int insertMulti(transaction::Methods*, TRI_voc_rid_t,
arangodb::velocypack::Slice const&, bool isRollback);
@ -213,7 +213,7 @@ class MMFilesHashIndex final : public MMFilesPathBasedIndex {
void batchInsertMulti(
transaction::Methods*,
std::vector<std::pair<TRI_voc_rid_t, arangodb::velocypack::Slice>> const&,
arangodb::basics::LocalTaskQueue* queue = nullptr);
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue);
int removeUniqueElement(transaction::Methods*, MMFilesHashIndexElement*,
bool);

View File

@ -251,7 +251,7 @@ int RocksDBEdgeIndex::remove(transaction::Methods* trx,
void RocksDBEdgeIndex::batchInsert(
transaction::Methods* trx,
std::vector<std::pair<TRI_voc_rid_t, VPackSlice>> const& documents,
arangodb::basics::LocalTaskQueue* queue) {
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) {
// acquire rocksdb transaction
RocksDBTransactionState* state = rocksutils::toRocksTransactionState(trx);
rocksdb::Transaction* rtrx = state->rocksTransaction();

View File

@ -111,7 +111,7 @@ class RocksDBEdgeIndex final : public RocksDBIndex {
void batchInsert(
transaction::Methods*,
std::vector<std::pair<TRI_voc_rid_t, arangodb::velocypack::Slice>> const&,
arangodb::basics::LocalTaskQueue* queue = nullptr) override;
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) override;
int drop() override;

View File

@ -292,7 +292,7 @@ class AssocMulti {
void batchInsert(std::function<void*()> const& contextCreator,
std::function<void(void*)> const& contextDestroyer,
std::shared_ptr<std::vector<Element> const> data,
LocalTaskQueue* queue) {
std::shared_ptr<LocalTaskQueue> queue) {
if (data->empty()) {
// nothing to do
return;

View File

@ -94,7 +94,7 @@ class MultiInserterTask : public LocalTask {
public:
MultiInserterTask(
LocalTaskQueue* queue, std::function<void(void*)> contextDestroyer,
std::shared_ptr<LocalTaskQueue> queue, std::function<void(void*)> contextDestroyer,
std::vector<Bucket>* buckets,
std::function<Element(void*, Element const&, uint64_t, Bucket&,
bool const, bool const)>
@ -168,7 +168,7 @@ class MultiPartitionerTask : public LocalTask {
public:
MultiPartitionerTask(
LocalTaskQueue* queue,
std::shared_ptr<LocalTaskQueue> queue,
std::function<uint64_t(void*, Element const&, bool)> hashElement,
std::function<void(void*)> const& contextDestroyer,
std::shared_ptr<std::vector<Element> const> data, size_t lower,

View File

@ -553,7 +553,7 @@ class AssocUnique {
void batchInsert(std::function<void*()> const& contextCreator,
std::function<void(void*)> const& contextDestroyer,
std::shared_ptr<std::vector<Element> const> data,
arangodb::basics::LocalTaskQueue* queue) {
std::shared_ptr<arangodb::basics::LocalTaskQueue> queue) {
TRI_ASSERT(queue != nullptr);
if (data->empty()) {
// nothing to do

View File

@ -70,7 +70,7 @@ class UniqueInserterTask : public LocalTask {
public:
UniqueInserterTask(
LocalTaskQueue* queue, std::function<void(void*)> contextDestroyer,
std::shared_ptr<LocalTaskQueue> queue, std::function<void(void*)> contextDestroyer,
std::vector<Bucket>* buckets,
std::function<int(void*, Element const&, Bucket&, uint64_t)> doInsert,
std::function<bool(void*, Bucket&, uint64_t)> checkResize, size_t i,
@ -140,7 +140,7 @@ class UniquePartitionerTask : public LocalTask {
public:
UniquePartitionerTask(
LocalTaskQueue* queue,
std::shared_ptr<LocalTaskQueue> queue,
std::function<uint64_t(void*, Element const&)> hashElement,
std::function<void(void*)> const& contextDestroyer,
std::shared_ptr<std::vector<Element> const> data, size_t lower,

View File

@ -34,7 +34,7 @@ using namespace arangodb::basics;
/// @brief create a task tied to the specified queue
////////////////////////////////////////////////////////////////////////////////
LocalTask::LocalTask(LocalTaskQueue* queue) : _queue(queue) {}
LocalTask::LocalTask(std::shared_ptr<LocalTaskQueue> queue) : _queue(queue) {}
////////////////////////////////////////////////////////////////////////////////
/// @brief dispatch this task to the underlying io_service
@ -58,7 +58,7 @@ void LocalTask::dispatch() {
/// @brief create a callback task tied to the specified queue
////////////////////////////////////////////////////////////////////////////////
LocalCallbackTask::LocalCallbackTask(LocalTaskQueue* queue,
LocalCallbackTask::LocalCallbackTask(std::shared_ptr<LocalTaskQueue> queue,
std::function<void()> cb)
: _queue(queue), _cb(cb) {}

View File

@ -43,7 +43,7 @@ class LocalTask : public std::enable_shared_from_this<LocalTask> {
LocalTask(LocalTask const&) = delete;
LocalTask& operator=(LocalTask const&) = delete;
explicit LocalTask(LocalTaskQueue* queue);
explicit LocalTask(std::shared_ptr<LocalTaskQueue> queue);
virtual ~LocalTask() {}
virtual void run() = 0;
@ -54,7 +54,7 @@ class LocalTask : public std::enable_shared_from_this<LocalTask> {
/// @brief the underlying queue
//////////////////////////////////////////////////////////////////////////////
LocalTaskQueue* _queue;
std::shared_ptr<LocalTaskQueue> _queue;
};
class LocalCallbackTask
@ -64,7 +64,7 @@ class LocalCallbackTask
LocalCallbackTask(LocalCallbackTask const&) = delete;
LocalCallbackTask& operator=(LocalCallbackTask const&) = delete;
LocalCallbackTask(LocalTaskQueue* queue, std::function<void()> cb);
LocalCallbackTask(std::shared_ptr<LocalTaskQueue> queue, std::function<void()> cb);
virtual ~LocalCallbackTask() {}
virtual void run();
@ -75,7 +75,7 @@ class LocalCallbackTask
/// @brief the underlying queue
//////////////////////////////////////////////////////////////////////////////
LocalTaskQueue* _queue;
std::shared_ptr<LocalTaskQueue> _queue;
//////////////////////////////////////////////////////////////////////////////
/// @brief the callback executed by run() (any exceptions will be caught and