1
0
Fork 0

fixes in batch import

This commit is contained in:
Jan Steemann 2012-04-16 16:59:29 +02:00
parent db2e47c3ef
commit a98f2a8771
6 changed files with 247 additions and 59 deletions

View File

@ -180,6 +180,11 @@ static TRI_json_t* ParseList (yyscan_t scanner) {
int c;
list = TRI_CreateListJson();
if (list == NULL) {
return NULL;
}
c = yylex(scanner);
comma = false;
@ -210,8 +215,8 @@ static TRI_json_t* ParseList (yyscan_t scanner) {
return NULL;
}
TRI_PushBack3ListJson(list, sub);
TRI_PushBackListJson(list, sub);
TRI_FreeJson(sub); // TODO remove unnecessary copying
c = yylex(scanner);
}
@ -292,9 +297,8 @@ static TRI_json_t* ParseArray (yyscan_t scanner) {
return NULL;
}
TRI_InsertArrayJson(array, name, sub);
TRI_Insert3ArrayJson(array, name, sub);
TRI_FreeString(name);
TRI_FreeJson(sub); // TODO remove unnecessary copying
c = yylex(scanner);
}
@ -312,13 +316,6 @@ static TRI_json_t* ParseArray (yyscan_t scanner) {
static TRI_json_t* ParseObject (yyscan_t scanner, int c) {
struct yyguts_t * yyg = (struct yyguts_t*) scanner;
char buffer[1024];
char* ep;
char* ptr;
double d;
size_t outLength;
switch (c) {
case END_OF_FILE:
yyextra.message = "expecting atom, got end-of-file";
@ -333,7 +330,11 @@ static TRI_json_t* ParseObject (yyscan_t scanner, int c) {
case NULL_CONSTANT:
return TRI_CreateNullJson();
case NUMBER_CONSTANT:
case NUMBER_CONSTANT: {
char buffer[512];
char* ep;
double d;
if ((size_t) yyleng >= sizeof(buffer)) {
yyextra.message = "number too big";
return NULL;
@ -360,11 +361,16 @@ static TRI_json_t* ParseObject (yyscan_t scanner, int c) {
}
return TRI_CreateNumberJson(d);
}
case STRING_CONSTANT: {
char* ptr;
size_t outLength;
case STRING_CONSTANT:
ptr = TRI_UnescapeUtf8String(yytext + 1, yyleng - 2, &outLength);
return TRI_CreateString2Json(ptr, outLength);
}
case OPEN_BRACE:
return ParseArray(scanner);

View File

@ -143,7 +143,7 @@ HttpHandler::status_e RestImportHandler::execute () {
////////////////////////////////////////////////////////////////////////////////
/// @brief creates documents
///
/// @REST{POST /import?collection=@FA{collection-identifier}}
/// @REST{POST /_api/import?collection=@FA{collection-identifier}}
///
/// Creates documents in the collection identified by the
/// @FA{collection-identifier}. The JSON representations of the documents must
@ -165,7 +165,7 @@ bool RestImportHandler::createDocument () {
return false;
}
// extract the cid
// extract the collection name
bool found;
string collection = request->value("collection", found);
@ -176,6 +176,16 @@ bool RestImportHandler::createDocument () {
return false;
}
bool documentMode = false;
// extract the import type
string type = request->value("type", found);
if (found && type == "documents") {
LOGGER_TRACE << "Import JSON documents";
documentMode = true;
}
// shall we create the collection?
string createStr = request->value("createCollection", found);
bool create = found ? StringUtils::boolean(createStr) : false;
@ -191,11 +201,9 @@ bool RestImportHandler::createDocument () {
}
TRI_json_t* keys = 0;
bool documentMode = false;
string line = request->body().substr(start, next);
// get first line
if (line != "") {
@ -208,21 +216,27 @@ bool RestImportHandler::createDocument () {
return false;
}
if (keys->_type == TRI_JSON_LIST) {
if (documentMode) {
if (keys->_type != TRI_JSON_ARRAY) {
TRI_FreeJson(keys);
generateError(HttpResponse::BAD,
TRI_ERROR_AVOCADO_CORRUPTED_DATAFILE,
"No JSON array in first line found");
return false;
}
}
else if (keys->_type == TRI_JSON_LIST) {
if (!checkKeys(keys)) {
TRI_DestroyJson(keys);
TRI_FreeJson(keys);
generateError(HttpResponse::BAD,
TRI_ERROR_AVOCADO_CORRUPTED_DATAFILE,
"No JSON string list in first line found");
return false;
}
start = next+1;
}
else if (keys->_type == TRI_JSON_ARRAY) {
documentMode = true;
start = next + 1;
}
else {
TRI_DestroyJson(keys);
TRI_FreeJson(keys);
generateError(HttpResponse::BAD,
TRI_ERROR_AVOCADO_CORRUPTED_DATAFILE,
"Wrong JSON data");
@ -243,7 +257,7 @@ bool RestImportHandler::createDocument () {
releaseCollection();
if (keys) {
TRI_DestroyJson(keys);
TRI_FreeJson(keys);
}
generateError(HttpResponse::BAD,
@ -258,7 +272,7 @@ bool RestImportHandler::createDocument () {
_documentCollection->beginWrite(_documentCollection);
while (next != string::npos) {
while (next != string::npos && start < request->body().length()) {
next = request->body().find('\n', start);
if (next == string::npos) {
@ -266,10 +280,14 @@ bool RestImportHandler::createDocument () {
}
else {
line = request->body().substr(start, next - start);
start = next + 1;
start = next + 1;
}
if (line.length() == 0) {
continue;
}
//printf("**** line = %s **********\n" , line.c_str());
//LOGGER_TRACE << "line = " << line;
TRI_json_t* values = parseJsonLine(line);
TRI_json_t* json = 0;
@ -284,32 +302,36 @@ bool RestImportHandler::createDocument () {
else {
// build the json object from the list
json = createJsonObject(keys, values);
TRI_DestroyJson(values);
TRI_FreeJson(values);
if (!json) {
LOGGER_WARNING << "no valid JSON data in line: " << (line);
++numError;
continue;
}
}
if (json) {
// now save the document
TRI_doc_mptr_t const mptr = _documentCollection->createJson(_documentCollection, TRI_DOC_MARKER_DOCUMENT, json, 0, false);
if (mptr._did != 0) {
++numCreated;
}
else {
++numError;
}
TRI_DestroyJson(json);
}
else {
LOGGER_WARNING << "ignored line: " << (line);
// now save the document
TRI_doc_mptr_t const mptr = _documentCollection->createJson(_documentCollection, TRI_DOC_MARKER_DOCUMENT, json, 0, false);
if (mptr._did != 0) {
++numCreated;
}
else {
++numError;
}
TRI_FreeJson(json);
}
else {
LOGGER_WARNING << "no values in line: " << (line);
LOGGER_WARNING << "no JSON data in line: " << (line);
++numError;
}
}
if (keys) {
TRI_DestroyJson(keys);
TRI_FreeJson(keys);
}
_documentCollection->endWrite(_documentCollection);

View File

@ -99,7 +99,7 @@ string RestVocbaseBaseHandler::COLLECTION_PATH = "/collection";
/// @brief documents import path
////////////////////////////////////////////////////////////////////////////////
string RestVocbaseBaseHandler::DOCUMENT_IMPORT_PATH = "/import";
string RestVocbaseBaseHandler::DOCUMENT_IMPORT_PATH = "/_api/import";
////////////////////////////////////////////////////////////////////////////////

View File

@ -69,7 +69,8 @@ namespace triagens {
_numConnectRetries = 0;
_result = 0;
_errorMessage = "";
_written = 0;
// _writeBuffer.clear();
reset();
@ -198,6 +199,7 @@ namespace triagens {
if (_isConnected) {
// can write now
_state = IN_WRITE;
_written = 0;
}
}
@ -377,6 +379,7 @@ namespace triagens {
if (_isConnected) {
// we are connected start with writing
_state = IN_WRITE;
_written = 0;
}
else {
// connect to server
@ -404,25 +407,29 @@ namespace triagens {
if (!checkSocket()) {
return false;
}
//printf("write():\n%s\n", _writeBuffer);
//printf("write():\n%s\n", (_writeBuffer.c_str() + _written));
#ifdef __APPLE__
int status = ::send(_socket, _writeBuffer.c_str(), _writeBuffer.length(), 0);
int status = ::send(_socket, _writeBuffer.c_str() + _written, _writeBuffer.length() - _written, 0);
#else
int status = ::send(_socket, _writeBuffer.c_str(), _writeBuffer.length(), MSG_NOSIGNAL);
int status = ::send(_socket, _writeBuffer.c_str() + _written, _writeBuffer.length() - _written, MSG_NOSIGNAL);
#endif
if (status == -1) {
_errorMessage = "::send() failed with: " + string(strerror(errno));
LOGGER_TRACE << "::send() failed with " << strerror(errno);
//LOGGER_ERROR << "::send() failed with " << strerror(errno);
close();
return false;
}
_state = IN_READ_HEADER;
_written += status;
if (_written == _writeBuffer.length()) {
_state = IN_READ_HEADER;
}
return true;
}

View File

@ -309,6 +309,8 @@ namespace triagens {
request_state _state;
size_t _written;
uint32_t _nextChunkedSize;
bool _isConnected;

View File

@ -28,6 +28,7 @@
#include <v8.h>
#include <stdio.h>
#include <fstream>
#include "build.h"
@ -177,6 +178,12 @@ regex_t doubleRegex;
regex_t intRegex;
////////////////////////////////////////////////////////////////////////////////
/// @brief max size body size (used for imports)
////////////////////////////////////////////////////////////////////////////////
static size_t maxUploadSize = 500000;
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
@ -294,7 +301,7 @@ static void sendBuffer (TRI_csv_parser_t* parser) {
// internalPrint("Import send: %s\n", buffer->c_str());
SimpleHttpResult* result = client->request(SimpleHttpClient::POST, "/import?collection=" + StringUtils::urlEncode(*collection), buffer->c_str(), buffer->length(), headerFields);
SimpleHttpResult* result = client->request(SimpleHttpClient::POST, "/_api/import?collection=" + StringUtils::urlEncode(*collection), buffer->c_str(), buffer->length(), headerFields);
VariantArray* va = result->getBodyAsVariantArray();
@ -410,7 +417,7 @@ static void ProcessCsvEnd (TRI_csv_parser_t* parser, char const* field, size_t r
StringBuffer* buffer = reinterpret_cast< StringBuffer* >(parser->_dataBegin);
buffer->appendChar(']');
if (buffer->length() > 100000) {
if (buffer->length() > maxUploadSize) {
sendBuffer(parser);
}
}
@ -528,12 +535,12 @@ static v8::Handle<v8::Value> JS_ImportCsvFile (v8::Arguments const& argv) {
sendBuffer(&parser);
}
close(fd);
//internalPrint("Test:\n%s\n", stringBuffer.c_str());
string msg = "Imported '" + StringUtils::itoa(numbers.first) + "' lines with '" + StringUtils::itoa(numbers.second) + "' errors.";
string msg = "Imported '" + StringUtils::itoa(numbers.first) + "' with '" + StringUtils::itoa(numbers.second) + "' errors.";
return scope.Close(v8::String::New(msg.c_str()));
internalPrint("%s\n", msg.c_str());
return scope.Close(v8::Integer::New(numbers.first));
}
////////////////////////////////////////////////////////////////////////////////
@ -541,6 +548,146 @@ static v8::Handle<v8::Value> JS_ImportCsvFile (v8::Arguments const& argv) {
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- JSON import functions
// -----------------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////////////
/// @addtogroup V8Shell
/// @{
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
/// @brief send buffer with JSON data
////////////////////////////////////////////////////////////////////////////////
static void sendJsonBuffer (const char* str, size_t len, size_t& created, size_t& errors, const string& collection) {
SimpleHttpClient* client = clientConnection->getHttpClient();
map<string, string> headerFields;
//internalPrint("Import send: %s\n", buffer.c_str());
SimpleHttpResult* result = client->request(SimpleHttpClient::POST, "/_api/import?type=documents&collection=" + StringUtils::urlEncode(collection), str, len, headerFields);
VariantArray* va = result->getBodyAsVariantArray();
if (va) {
VariantBoolean* vb = va->lookupBoolean("error");
if (vb && vb->getValue()) {
// is error
VariantString* vs = va->lookupString("errorMessage");
if (vs) {
internalPrint("Import error: %s\n", vs->getValue().c_str());
}
else {
internalPrint("Import error: unknow error\n");
}
}
VariantInt64* vi = va->lookupInt64("created");
if (vi && vi->getValue() > 0) {
created += (size_t) vi->getValue();
}
vi = va->lookupInt64("errors");
if (vi && vi->getValue() > 0) {
errors += (size_t) vi->getValue();
}
delete va;
}
else {
internalPrint("Import error: unknow error\n");
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief imports a JSON file
///
/// @FUN{importJsonFile(@FA{filename}, @FA{collection})}
///
/// Imports data of a CSV file. The data is imported to @FA{collection}.
///
////////////////////////////////////////////////////////////////////////////////
static v8::Handle<v8::Value> JS_ImportJsonFile (v8::Arguments const& argv) {
v8::HandleScope scope;
if (argv.Length() < 2) {
return scope.Close(v8::ThrowException(v8::String::New("usage: importJsonFile(<filename>, <collection>)")));
}
// extract the filename
v8::String::Utf8Value filename(argv[0]);
if (*filename == 0) {
return scope.Close(v8::ThrowException(v8::String::New("<filename> must be an UTF8 filename")));
}
v8::String::Utf8Value collection(argv[1]);
if (*collection == 0) {
return scope.Close(v8::ThrowException(v8::String::New("<collection> must be an UTF8 filename")));
}
// read and convert
int fd = open(*filename, O_RDONLY);
if (fd < 0) {
return scope.Close(v8::ThrowException(v8::String::New(TRI_LAST_ERROR_STR)));
}
StringBuffer stringBuffer;
string collectionName = TRI_ObjectToString(argv[1]);
size_t created = 0;
size_t errors = 0;
char buffer[10240];
while (true) {
v8::HandleScope scope;
ssize_t n = read(fd, buffer, 10239);
if (n < 0) {
return scope.Close(v8::ThrowException(v8::String::New(TRI_LAST_ERROR_STR)));
}
else if (n == 0) {
break;
}
stringBuffer.appendText(buffer, n);
if (stringBuffer.length() > maxUploadSize) {
const char* first = stringBuffer.c_str();
char* pos = (char*) memrchr(first, '\n', stringBuffer.length());
if (pos != 0) {
size_t len = pos - first + 1;
sendJsonBuffer(first, len, created, errors, collectionName);
stringBuffer.erase_front(len);
}
}
}
if (stringBuffer.length() > 0) {
sendJsonBuffer(stringBuffer.c_str(), stringBuffer.length(), created, errors, collectionName);
}
close(fd);
string msg = "Imported '" + StringUtils::itoa(created) + "' objects with '" + StringUtils::itoa(errors) + "' errors.";
internalPrint("%s\n", msg.c_str());
return scope.Close(v8::Integer::New(created));
}
////////////////////////////////////////////////////////////////////////////////
/// @}
////////////////////////////////////////////////////////////////////////////////
// -----------------------------------------------------------------------------
// --SECTION-- private functions
@ -646,6 +793,7 @@ static void ParseProgramOptions (int argc, char* argv[]) {
("no-colors", "deactivate color support")
("no-auto-complete", "disable auto completion")
("unit-tests", &UnitTests, "do not start as shell, run unit tests instead")
("max-upload-size", &maxUploadSize, "maximum size of import chunks")
(hidden, true)
;
@ -1330,6 +1478,9 @@ int main (int argc, char* argv[]) {
context->Global()->Set(v8::String::New("importCsvFile"),
v8::FunctionTemplate::New(JS_ImportCsvFile)->GetFunction(),
v8::ReadOnly);
context->Global()->Set(v8::String::New("importJsonFile"),
v8::FunctionTemplate::New(JS_ImportJsonFile)->GetFunction(),
v8::ReadOnly);
// http://www.network-science.de/ascii/ Font: ogre