//////////////////////////////////////////////////////////////////////////////// /// DISCLAIMER /// /// Copyright 2016 ArangoDB GmbH, Cologne, Germany /// /// Licensed under the Apache License, Version 2.0 (the "License"); /// you may not use this file except in compliance with the License. /// You may obtain a copy of the License at /// /// http://www.apache.org/licenses/LICENSE-2.0 /// /// Unless required by applicable law or agreed to in writing, software /// distributed under the License is distributed on an "AS IS" BASIS, /// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. /// See the License for the specific language governing permissions and /// limitations under the License. /// /// Copyright holder is ArangoDB GmbH, Cologne, Germany /// /// @author Jan Steemann //////////////////////////////////////////////////////////////////////////////// #include "ImportFeature.h" #include "ApplicationFeatures/ApplicationServer.h" #include "Basics/FileUtils.h" #include "Basics/StringUtils.h" #include "Import/ImportHelper.h" #include "Logger/Logger.h" #include "ProgramOptions/ProgramOptions.h" #include "Shell/ClientFeature.h" #include "SimpleHttpClient/GeneralClientConnection.h" #include "SimpleHttpClient/SimpleHttpClient.h" #include "SimpleHttpClient/SimpleHttpResult.h" #include #include using namespace arangodb::basics; using namespace arangodb::httpclient; using namespace arangodb::options; namespace arangodb { ImportFeature::ImportFeature(application_features::ApplicationServer& server, int* result) : ApplicationFeature(server, "Import"), _filename(""), _useBackslash(false), _convert(true), _autoChunkSize(true), _chunkSize(1024 * 1024 * 1), _threadCount(2), _collectionName(""), _fromCollectionPrefix(""), _toCollectionPrefix(""), _createCollection(false), _createDatabase(false), _createCollectionType("document"), _typeImport("json"), _overwrite(false), _quote("\""), _separator(""), _progress(true), _ignoreMissing(false), _onDuplicateAction("error"), _rowsToSkip(0), _result(result), _latencyStats(false) { requiresElevatedPrivileges(false); setOptional(false); startsAfter("BasicsPhase"); } void ImportFeature::collectOptions(std::shared_ptr options) { options->addOption("--file", "file name (\"-\" for STDIN)", new StringParameter(&_filename)); options->addOption( "--backslash-escape", "use backslash as the escape character for quotes, used for csv", new BooleanParameter(&_useBackslash)); options->addOption("--batch-size", "size for individual data batches (in bytes)", new UInt64Parameter(&_chunkSize)); options->addOption( "--threads", "Number of parallel import threads. Most useful for the rocksdb engine", new UInt32Parameter(&_threadCount)); options->addOption("--collection", "collection name", new StringParameter(&_collectionName)); options->addOption("--from-collection-prefix", "_from collection name prefix (will be prepended to all " "values in '_from')", new StringParameter(&_fromCollectionPrefix)); options->addOption( "--to-collection-prefix", "_to collection name prefix (will be prepended to all values in '_to')", new StringParameter(&_toCollectionPrefix)); options->addOption("--create-collection", "create collection if it does not yet exist", new BooleanParameter(&_createCollection)); options->addOption("--create-database", "create the target database if it does not exist", new BooleanParameter(&_createDatabase)); options->addOption("--skip-lines", "number of lines to skip for formats (csv and tsv only)", new UInt64Parameter(&_rowsToSkip)); options->addOption("--convert", "convert the strings 'null', 'false', 'true' and strings " "containing numbers into non-string types (csv and tsv " "only)", new BooleanParameter(&_convert)); options->addOption("--translate", "translate an attribute name (use as --translate " "\"from=to\", for csv and tsv only)", new VectorParameter(&_translations)); options->addOption("--remove-attribute", "remove an attribute before inserting an attribute" " into a collection (for csv and tsv only)", new VectorParameter(&_removeAttributes)); std::unordered_set types = {"document", "edge"}; std::vector typesVector(types.begin(), types.end()); std::string typesJoined = StringUtils::join(typesVector, " or "); options->addOption("--create-collection-type", "type of collection if collection is created (" + typesJoined + ")", new DiscreteValuesParameter(&_createCollectionType, types)); std::unordered_set imports = {"csv", "tsv", "json", "jsonl", "auto"}; options->addOption("--type", "type of import file", new DiscreteValuesParameter(&_typeImport, imports)); options->addOption( "--overwrite", "overwrite collection if it exist (WARNING: this will remove any data " "from the collection)", new BooleanParameter(&_overwrite)); options->addOption("--quote", "quote character(s), used for csv", new StringParameter(&_quote)); options->addOption("--separator", "field separator, used for csv and tsv", new StringParameter(&_separator)); options->addOption("--progress", "show progress", new BooleanParameter(&_progress)); options->addOption("--ignore-missing", "ignore missing columns in csv input", new BooleanParameter(&_ignoreMissing)); std::unordered_set actions = {"error", "update", "replace", "ignore"}; std::vector actionsVector(actions.begin(), actions.end()); std::string actionsJoined = StringUtils::join(actionsVector, ", "); options->addOption("--on-duplicate", "action to perform when a unique key constraint " "violation occurs. Possible values: " + actionsJoined, new DiscreteValuesParameter(&_onDuplicateAction, actions)); options->addOption( "--latency", "show 10 second latency statistics (values in microseconds)", new BooleanParameter(&_latencyStats)); } void ImportFeature::validateOptions(std::shared_ptr options) { auto const& positionals = options->processingResult()._positionals; size_t n = positionals.size(); if (1 == n) { // only take positional file name attribute into account if user // did not specify the --file option as well if (!options->processingResult().touched("--file")) { _filename = positionals[0]; } } else if (1 < n) { LOG_TOPIC("0dc12", FATAL, arangodb::Logger::FIXME) << "expecting at most one filename, got " + StringUtils::join(positionals, ", "); FATAL_ERROR_EXIT(); } // _chunkSize is dynamic ... unless user explicitly sets it _autoChunkSize = !options->processingResult().touched("--batch-size"); if (_chunkSize > arangodb::import::ImportHelper::MaxBatchSize) { // it's not sensible to raise the batch size beyond this value // because the server has a built-in limit for the batch size too // and will reject bigger HTTP request bodies LOG_TOPIC("e6d71", WARN, arangodb::Logger::FIXME) << "capping --batch-size value to " << arangodb::import::ImportHelper::MaxBatchSize; _chunkSize = arangodb::import::ImportHelper::MaxBatchSize; } if (_threadCount < 1) { // it's not sensible to use just one thread LOG_TOPIC("9e3f9", WARN, arangodb::Logger::FIXME) << "capping --threads value to " << 1; _threadCount = 1; } if (_threadCount > TRI_numberProcessors() * 2) { // it's not sensible to use just one thread ... // and import's CPU usage is negligible, real limit is cluster cores LOG_TOPIC("aca46", WARN, arangodb::Logger::FIXME) << "capping --threads value to " << TRI_numberProcessors() * 2; _threadCount = (uint32_t)TRI_numberProcessors() * 2; } for (auto const& it : _translations) { auto parts = StringUtils::split(it, "="); if (parts.size() != 2) { LOG_TOPIC("e322b", FATAL, arangodb::Logger::FIXME) << "invalid translation '" << it << "'"; FATAL_ERROR_EXIT(); } StringUtils::trimInPlace(parts[0]); StringUtils::trimInPlace(parts[1]); if (parts[0].empty() || parts[1].empty()) { LOG_TOPIC("83ae7", FATAL, arangodb::Logger::FIXME) << "invalid translation '" << it << "'"; FATAL_ERROR_EXIT(); } } for (std::string& str : _removeAttributes) { StringUtils::trimInPlace(str); if (str.empty()) { LOG_TOPIC("74cfc", FATAL, arangodb::Logger::FIXME) << "cannot remove an empty attribute"; FATAL_ERROR_EXIT(); } } } void ImportFeature::start() { ClientFeature* client = application_features::ApplicationServer::getFeature( "Client"); int ret = EXIT_SUCCESS; *_result = ret; if (_typeImport == "auto") { std::regex re = std::regex(".*?\\.([a-zA-Z]+)(.gz|)", std::regex::ECMAScript); std::smatch match; if (std::regex_match(_filename, match, re)) { std::string extension = StringUtils::tolower(match[1].str()); if (extension == "json" || extension == "jsonl" || extension == "csv" || extension == "tsv") { _typeImport = extension; } else { LOG_TOPIC("cb067", FATAL, arangodb::Logger::FIXME) << "Unsupported file extension '" << extension << "'"; FATAL_ERROR_EXIT(); } } else { LOG_TOPIC("0ee99", WARN, arangodb::Logger::FIXME) << "Unable to auto-detect file type from filename '" << _filename << "'. using filetype 'json'"; _typeImport = "json"; } } try { _httpClient = client->createHttpClient(); } catch (...) { LOG_TOPIC("8477c", FATAL, arangodb::Logger::FIXME) << "cannot create server connection, giving up!"; FATAL_ERROR_EXIT(); } _httpClient->params().setLocationRewriter(static_cast(client), &rewriteLocation); _httpClient->params().setUserNamePassword("/", client->username(), client->password()); // must stay here in order to establish the connection int err = TRI_ERROR_NO_ERROR; auto versionString = _httpClient->getServerVersion(&err); auto const dbName = client->databaseName(); auto successfulConnection = [&]() { std::cout << ClientFeature::buildConnectedMessage(_httpClient->getEndpointSpecification(), versionString, /*role*/ "", /*mode*/ "", client->databaseName(), client->username()) << std::endl; std::cout << "----------------------------------------" << std::endl; std::cout << "database: " << client->databaseName() << std::endl; std::cout << "collection: " << _collectionName << std::endl; if (!_fromCollectionPrefix.empty()) { std::cout << "from collection prefix: " << _fromCollectionPrefix << std::endl; } if (!_toCollectionPrefix.empty()) { std::cout << "to collection prefix: " << _toCollectionPrefix << std::endl; } std::cout << "create: " << (_createCollection ? "yes" : "no") << std::endl; std::cout << "create database: " << (_createDatabase ? "yes" : "no") << std::endl; std::cout << "source filename: " << _filename << std::endl; std::cout << "file type: " << _typeImport << std::endl; if (_typeImport == "csv") { std::cout << "quote: " << _quote << std::endl; } if (_typeImport == "csv" || _typeImport == "tsv") { std::cout << "separator: " << _separator << std::endl; } std::cout << "threads: " << _threadCount << std::endl; std::cout << "connect timeout: " << client->connectionTimeout() << std::endl; std::cout << "request timeout: " << client->requestTimeout() << std::endl; std::cout << "----------------------------------------" << std::endl; }; if (_createDatabase && err == TRI_ERROR_ARANGO_DATABASE_NOT_FOUND) { // database not found, but database creation requested std::cout << "Creating database '" << dbName << "'" << std::endl; client->setDatabaseName("_system"); int res = tryCreateDatabase(client, dbName); if (res != TRI_ERROR_NO_ERROR) { LOG_TOPIC("90431", ERR, arangodb::Logger::FIXME) << "Could not create database '" << dbName << "'"; LOG_TOPIC("891eb", FATAL, arangodb::Logger::FIXME) << _httpClient->getErrorMessage() << "'"; FATAL_ERROR_EXIT(); } // restore old database name client->setDatabaseName(dbName); err = TRI_ERROR_NO_ERROR; versionString = _httpClient->getServerVersion(&err); if (err != TRI_ERROR_NO_ERROR) { // disconnect, as this will make the import abort _httpClient->disconnect(); } } if (!_httpClient->isConnected()) { LOG_TOPIC("541c6", ERR, arangodb::Logger::FIXME) << "Could not connect to endpoint '" << client->endpoint() << "', database: '" << client->databaseName() << "', username: '" << client->username() << "'"; LOG_TOPIC("034c9", FATAL, arangodb::Logger::FIXME) << _httpClient->getErrorMessage() << "'"; FATAL_ERROR_EXIT(); } TRI_ASSERT(client->databaseName() == dbName); // successfully connected // now print connection info successfulConnection(); _httpClient->disconnect(); // we do not reuse this anymore SimpleHttpClientParams params = _httpClient->params(); arangodb::import::ImportHelper ih(client, client->endpoint(), params, _chunkSize, _threadCount, _autoChunkSize); // create colletion if (_createCollection) { ih.setCreateCollection(true); } if (_createCollectionType == "document" || _createCollectionType == "edge") { ih.setCreateCollectionType(_createCollectionType); } ih.setConversion(_convert); ih.setRowsToSkip(static_cast(_rowsToSkip)); ih.setOverwrite(_overwrite); ih.useBackslash(_useBackslash); ih.ignoreMissing(_ignoreMissing); std::unordered_map translations; for (auto const& it : _translations) { auto parts = StringUtils::split(it, "="); TRI_ASSERT(parts.size() == 2); // already validated before StringUtils::trimInPlace(parts[0]); StringUtils::trimInPlace(parts[1]); translations.emplace(parts[0], parts[1]); } ih.setTranslations(translations); ih.setRemoveAttributes(_removeAttributes); // quote if (_quote.length() <= 1) { ih.setQuote(_quote); } else { LOG_TOPIC("f0b3a", FATAL, arangodb::Logger::FIXME) << "Wrong length of quote character."; FATAL_ERROR_EXIT(); } if (_separator.empty()) { _separator = ","; if (_typeImport == "tsv") { _separator = "\\t"; } } // separator if (_separator.length() == 1 || _separator == "\\r" || _separator == "\\n" || _separator == "\\t") { ih.setSeparator(_separator); } else { LOG_TOPIC("59186", FATAL, arangodb::Logger::FIXME) << "_separator must be exactly one character."; FATAL_ERROR_EXIT(); } // collection name if (_collectionName == "") { LOG_TOPIC("a64ef", FATAL, arangodb::Logger::FIXME) << "Collection name is missing."; FATAL_ERROR_EXIT(); } // filename if (_filename == "") { LOG_TOPIC("10531", FATAL, arangodb::Logger::FIXME) << "File name is missing."; FATAL_ERROR_EXIT(); } if (_filename != "-" && !FileUtils::isRegularFile(_filename)) { if (!FileUtils::exists(_filename)) { LOG_TOPIC("6f83e", FATAL, arangodb::Logger::FIXME) << "Cannot open file '" << _filename << "'. File not found."; } else if (FileUtils::isDirectory(_filename)) { LOG_TOPIC("70dac", FATAL, arangodb::Logger::FIXME) << "Specified file '" << _filename << "' is a directory. Please use a regular file."; } else { LOG_TOPIC("8699d", FATAL, arangodb::Logger::FIXME) << "Cannot open '" << _filename << "'. Invalid file type."; } FATAL_ERROR_EXIT(); } // progress if (_progress) { ih.setProgress(true); } // progress if (_latencyStats) { ih.startHistogram(); } if (_onDuplicateAction != "error" && _onDuplicateAction != "update" && _onDuplicateAction != "replace" && _onDuplicateAction != "ignore") { LOG_TOPIC("6ad02", FATAL, arangodb::Logger::FIXME) << "Invalid value for '--on-duplicate'. Possible values: 'error', " "'update', 'replace', 'ignore'."; FATAL_ERROR_EXIT(); } ih.setOnDuplicateAction(_onDuplicateAction); try { bool ok = false; // set prefixes ih.setFrom(_fromCollectionPrefix); ih.setTo(_toCollectionPrefix); // import type if (_typeImport == "csv") { std::cout << "Starting CSV import..." << std::endl; ok = ih.importDelimited(_collectionName, _filename, arangodb::import::ImportHelper::CSV); } else if (_typeImport == "tsv") { std::cout << "Starting TSV import..." << std::endl; ih.setQuote(""); ok = ih.importDelimited(_collectionName, _filename, arangodb::import::ImportHelper::TSV); } else if (_typeImport == "json" || _typeImport == "jsonl") { std::cout << "Starting JSON import..." << std::endl; ok = ih.importJson(_collectionName, _filename, (_typeImport == "jsonl")); } else { LOG_TOPIC("8941e", FATAL, arangodb::Logger::FIXME) << "Wrong type '" << _typeImport << "'."; FATAL_ERROR_EXIT(); } std::cout << std::endl; // give information about import (even if errors occur) std::cout << "created: " << ih.getNumberCreated() << std::endl; std::cout << "warnings/errors: " << ih.getNumberErrors() << std::endl; std::cout << "updated/replaced: " << ih.getNumberUpdated() << std::endl; std::cout << "ignored: " << ih.getNumberIgnored() << std::endl; if (_typeImport == "csv" || _typeImport == "tsv") { std::cout << "lines read: " << ih.getReadLines() << std::endl; } if (!ok) { auto const& msgs = ih.getErrorMessages(); if (!msgs.empty()) { LOG_TOPIC("46995", ERR, arangodb::Logger::FIXME) << "error message(s):"; for (std::string const& msg : msgs) { LOG_TOPIC("25049", ERR, arangodb::Logger::FIXME) << msg; } } } } catch (std::exception const& ex) { LOG_TOPIC("a7dca", ERR, arangodb::Logger::FIXME) << "caught exception: " << ex.what(); } catch (...) { LOG_TOPIC("fc131", ERR, arangodb::Logger::FIXME) << "caught unknown exception"; } *_result = ret; } int ImportFeature::tryCreateDatabase(ClientFeature* client, std::string const& name) { VPackBuilder builder; builder.openObject(); builder.add("name", VPackValue(name)); builder.add("users", VPackValue(VPackValueType::Array)); builder.openObject(); builder.add("username", VPackValue(client->username())); builder.add("passwd", VPackValue(client->password())); builder.close(); builder.close(); builder.close(); std::string const body = builder.slice().toJson(); std::unique_ptr response( _httpClient->request(rest::RequestType::POST, "/_api/database", body.c_str(), body.size())); if (response == nullptr || !response->isComplete()) { return TRI_ERROR_INTERNAL; } auto returnCode = response->getHttpReturnCode(); if (returnCode == static_cast(rest::ResponseCode::OK) || returnCode == static_cast(rest::ResponseCode::CREATED)) { // all ok return TRI_ERROR_NO_ERROR; } if (returnCode == static_cast(rest::ResponseCode::UNAUTHORIZED) || returnCode == static_cast(rest::ResponseCode::FORBIDDEN)) { // invalid authorization _httpClient->setErrorMessage(getHttpErrorMessage(response.get(), nullptr), false); return TRI_ERROR_FORBIDDEN; } // any other error _httpClient->setErrorMessage(getHttpErrorMessage(response.get(), nullptr), false); return TRI_ERROR_INTERNAL; } } // namespace arangodb