//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany /// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Dr. Frank Celler /// @author Achim Brandt //////////////////////////////////////////////////////////////////////////////// #ifndef ARANGODB_IMPORT_IMPORT_HELPER_H #define ARANGODB_IMPORT_IMPORT_HELPER_H 1 #include #include "AutoTuneThread.h" #include "QuickHistogram.h" #include "Basics/Common.h" #include "Basics/ConditionVariable.h" #include "Basics/Mutex.h" #include "Basics/StringBuffer.h" #include "Basics/csv.h" #ifdef _WIN32 #include "Basics/win-utils.h" #endif namespace arangodb { class ClientFeature; namespace httpclient { class SimpleHttpClient; class SimpleHttpResult; struct SimpleHttpClientParams; } // namespace httpclient } // namespace arangodb //////////////////////////////////////////////////////////////////////////////// /// @brief class for http requests //////////////////////////////////////////////////////////////////////////////// namespace arangodb { namespace import { class SenderThread; struct ImportStatistics { size_t _numberCreated = 0; size_t _numberErrors = 0; size_t _numberUpdated = 0; size_t _numberIgnored = 0; arangodb::Mutex _mutex; QuickHistogram _histogram; }; class ImportHelper { public: ////////////////////////////////////////////////////////////////////////////// /// @brief type of delimited import ////////////////////////////////////////////////////////////////////////////// enum DelimitedImportType { CSV = 0, TSV }; private: ImportHelper(ImportHelper const&) = delete; ImportHelper& operator=(ImportHelper const&) = delete; public: ImportHelper(ClientFeature const* client, std::string const& endpoint, httpclient::SimpleHttpClientParams const& params, uint64_t maxUploadSize, uint32_t threadCount, bool autoUploadSize = false); ~ImportHelper(); ////////////////////////////////////////////////////////////////////////////// /// @brief imports a delimited file ////////////////////////////////////////////////////////////////////////////// bool importDelimited(std::string const& collectionName, std::string const& fileName, DelimitedImportType typeImport); ////////////////////////////////////////////////////////////////////////////// /// @brief imports a file with JSON objects /// each line must contain a complete JSON object ////////////////////////////////////////////////////////////////////////////// bool importJson(std::string const& collectionName, std::string const& fileName, bool assumeLinewise); ////////////////////////////////////////////////////////////////////////////// /// @brief sets the action to carry out on duplicate _key ////////////////////////////////////////////////////////////////////////////// void setOnDuplicateAction(std::string const& action) { _onDuplicateAction = action; } ////////////////////////////////////////////////////////////////////////////// /// @brief sets the quote character /// /// this is a string because the quote might also be empty if not used ////////////////////////////////////////////////////////////////////////////// void setQuote(std::string const& quote) { _quote = quote; } ////////////////////////////////////////////////////////////////////////////// /// @brief set collection name prefix for _from ////////////////////////////////////////////////////////////////////////////// void setFrom(std::string const& from) { _fromCollectionPrefix = from; } ////////////////////////////////////////////////////////////////////////////// /// @brief set collection name prefix for _to ////////////////////////////////////////////////////////////////////////////// void setTo(std::string const& to) { _toCollectionPrefix = to; } ////////////////////////////////////////////////////////////////////////////// /// @brief whether or not backslashes can be used for escaping quotes ////////////////////////////////////////////////////////////////////////////// void useBackslash(bool value) { _useBackslash = value; } /// @brief whether or not missing values in CSV files should be ignored void ignoreMissing(bool value) { _ignoreMissing = value; } ////////////////////////////////////////////////////////////////////////////// /// @brief sets the separator ////////////////////////////////////////////////////////////////////////////// void setSeparator(std::string const& separator) { _separator = separator; } ////////////////////////////////////////////////////////////////////////////// /// @brief sets the createCollection flag /// /// @param bool value create the collection if it does not /// exist ////////////////////////////////////////////////////////////////////////////// void setCreateCollection(bool value) { _createCollection = value; } void setCreateCollectionType(std::string const& value) { _createCollectionType = value; } void setTranslations(std::unordered_map const& translations) { _translations = translations; } void setRemoveAttributes(std::vector const& attr) { for (std::string const& str : attr) { _removeAttributes.insert(str); } } ////////////////////////////////////////////////////////////////////////////// /// @brief whether or not to overwrite existing data in the collection ////////////////////////////////////////////////////////////////////////////// void setOverwrite(bool value) { _overwrite = value; } ////////////////////////////////////////////////////////////////////////////// /// @brief set the number of rows to skip ////////////////////////////////////////////////////////////////////////////// void setRowsToSkip(size_t value) { _rowsToSkip = value; } ////////////////////////////////////////////////////////////////////////////// /// @brief get the number of rows to skip ////////////////////////////////////////////////////////////////////////////// size_t getRowsToSkip() const { return _rowsToSkip; } ////////////////////////////////////////////////////////////////////////////// /// @brief whether or not to convert strings that contain "null", "false", /// "true" or that look like numbers into those types ////////////////////////////////////////////////////////////////////////////// void setConversion(bool value) { _convert = value; } ////////////////////////////////////////////////////////////////////////////// /// @brief set the progress indicator ////////////////////////////////////////////////////////////////////////////// void setProgress(bool value) { _progress = value; } ////////////////////////////////////////////////////////////////////////////// /// @brief get the number of lines read (meaningful for CSV only) ////////////////////////////////////////////////////////////////////////////// size_t getReadLines() { return _numberLines; } ////////////////////////////////////////////////////////////////////////////// /// @brief get the number of documents imported ////////////////////////////////////////////////////////////////////////////// size_t getNumberCreated() { return _stats._numberCreated; } ////////////////////////////////////////////////////////////////////////////// /// @brief get the number of errors ////////////////////////////////////////////////////////////////////////////// size_t getNumberErrors() { return _stats._numberErrors; } ////////////////////////////////////////////////////////////////////////////// /// @brief get the number of updated documents ////////////////////////////////////////////////////////////////////////////// size_t getNumberUpdated() { return _stats._numberUpdated; } ////////////////////////////////////////////////////////////////////////////// /// @brief get the number of ignored documents ////////////////////////////////////////////////////////////////////////////// size_t getNumberIgnored() const { return _stats._numberIgnored; } ////////////////////////////////////////////////////////////////////////////// /// @brief increase the row counter ////////////////////////////////////////////////////////////////////////////// void incRowsRead() { ++_rowsRead; } ////////////////////////////////////////////////////////////////////////////// /// @brief get the number of rows read ////////////////////////////////////////////////////////////////////////////// size_t getRowsRead() const { return _rowsRead; } ////////////////////////////////////////////////////////////////////////////// /// @brief start the optional histogram thread ////////////////////////////////////////////////////////////////////////////// void startHistogram() { _stats._histogram.start(); } ////////////////////////////////////////////////////////////////////////////// /// @brief get the error message /// /// @return string get the error message ////////////////////////////////////////////////////////////////////////////// std::vector getErrorMessages() { return _errorMessages; } uint64_t getMaxUploadSize() { return (_maxUploadSize.load()); } void setMaxUploadSize(uint64_t newSize) { _maxUploadSize.store(newSize); } uint64_t rotatePeriodByteCount() { return (_periodByteCount.exchange(0)); } void addPeriodByteCount(uint64_t add) { _periodByteCount.fetch_add(add); } uint32_t getThreadCount() const { return _threadCount; } static unsigned const MaxBatchSize; private: static void ProcessCsvBegin(TRI_csv_parser_t*, size_t); static void ProcessCsvAdd(TRI_csv_parser_t*, char const*, size_t, size_t, size_t, bool); static void ProcessCsvEnd(TRI_csv_parser_t*, char const*, size_t, size_t, size_t, bool); void reportProgress(int64_t, int64_t, double&); std::string getCollectionUrlPart() const; void beginLine(size_t row); void addField(char const*, size_t, size_t row, size_t column, bool escaped); void addLastField(char const*, size_t, size_t row, size_t column, bool escaped); bool collectionExists(); bool checkCreateCollection(); bool truncateCollection(); void sendCsvBuffer(); void sendJsonBuffer(char const* str, size_t len, bool isObject); SenderThread* findIdleSender(); void waitForSenders(); private: std::unique_ptr _httpClient; std::atomic _maxUploadSize; std::atomic _periodByteCount; bool const _autoUploadSize; std::unique_ptr _autoTuneThread; std::vector> _senderThreads; uint32_t const _threadCount; basics::ConditionVariable _threadsCondition; basics::StringBuffer _tempBuffer; std::string _separator; std::string _quote; std::string _createCollectionType; bool _useBackslash; bool _convert; bool _createCollection; bool _overwrite; bool _progress; bool _firstChunk; bool _ignoreMissing; size_t _numberLines; ImportStatistics _stats; size_t _rowsRead; size_t _rowOffset; size_t _rowsToSkip; int64_t _keyColumn; std::string _onDuplicateAction; std::string _collectionName; std::string _fromCollectionPrefix; std::string _toCollectionPrefix; arangodb::basics::StringBuffer _lineBuffer; arangodb::basics::StringBuffer _outputBuffer; std::string _firstLine; std::vector _columnNames; std::unordered_map _translations; std::unordered_set _removeAttributes; bool _hasError; std::vector _errorMessages; static double const ProgressStep; }; } // namespace import } // namespace arangodb #endif