1
0
Fork 0

fix non-blocking stream cursors (#6064)

This commit is contained in:
Simon 2018-08-03 12:41:52 +02:00 committed by Michael Hackstein
parent e3ce2e8cfb
commit a79c8af310
14 changed files with 108 additions and 130 deletions

View File

@ -88,7 +88,8 @@ VPackSlice QueryResultCursor::next() {
/// @brief return the cursor size
size_t QueryResultCursor::count() const { return _iterator.size(); }
std::pair<ExecutionState, Result> QueryResultCursor::dump(VPackBuilder& builder, std::function<void()>&) {
std::pair<ExecutionState, Result> QueryResultCursor::dump(VPackBuilder& builder,
std::function<void()> const&) {
// This cursor cannot block, result already there.
auto res = dumpSync(builder);
return {ExecutionState::DONE, res};
@ -196,19 +197,22 @@ QueryStreamCursor::~QueryStreamCursor() {
}
if (_query) { // cursor is canceled or timed-out
// now remove the continue handler we may have registered in the query
_query->sharedState()->setContinueCallback();
// Query destructor will cleanup plan and abort transaction
_query.reset();
}
}
std::pair<ExecutionState, Result> QueryStreamCursor::dump(VPackBuilder& builder, std::function<void()>& continueHandler) {
std::pair<ExecutionState, Result> QueryStreamCursor::dump(VPackBuilder& builder,
std::function<void()> const& ch) {
TRI_ASSERT(batchSize() > 0);
LOG_TOPIC(TRACE, Logger::QUERIES) << "executing query " << _id << ": '"
<< _query->queryString().extract(1024) << "'";
// We will get a different RestHandler on every dump, so we need to update the Callback
std::shared_ptr<SharedQueryState> ss = _query->sharedState();
ss->setContinueHandler(continueHandler);
ss->setContinueHandler(ch);
try {
ExecutionState state = prepareDump();

View File

@ -59,7 +59,7 @@ class QueryResultCursor final : public arangodb::Cursor {
std::pair<ExecutionState, Result> dump(
velocypack::Builder& result,
std::function<void()>& continueHandler) override final;
std::function<void()> const& continueHandler) override final;
Result dumpSync(velocypack::Builder& result) override final;
@ -101,7 +101,7 @@ class QueryStreamCursor final : public arangodb::Cursor {
std::pair<ExecutionState, Result> dump(
velocypack::Builder& result,
std::function<void()>& continueHandler) override final;
std::function<void()> const& continueHandler) override final;
Result dumpSync(velocypack::Builder& result) override final;

View File

@ -78,7 +78,8 @@ VPackSlice MMFilesExportCursor::next() {
size_t MMFilesExportCursor::count() const { return _size; }
std::pair<aql::ExecutionState, Result> MMFilesExportCursor::dump(VPackBuilder& builder, std::function<void()>&) {
std::pair<aql::ExecutionState, Result> MMFilesExportCursor::dump(VPackBuilder& builder,
std::function<void()> const&) {
return {aql::ExecutionState::DONE, dumpSync(builder)};
}

View File

@ -56,7 +56,7 @@ class MMFilesExportCursor final : public Cursor {
std::pair<arangodb::aql::ExecutionState, Result> dump(
velocypack::Builder& result,
std::function<void()>& continueHandler) override final;
std::function<void()> const& ch) override final;
Result dumpSync(velocypack::Builder& result) override final;

View File

@ -48,11 +48,18 @@ RestCursorHandler::RestCursorHandler(
_query(nullptr),
_queryResult(),
_queryRegistry(queryRegistry),
_leasedCursor(nullptr),
_hasStarted(false),
_queryKilled(false),
_isValidForFinalize(false) {}
RestCursorHandler::~RestCursorHandler() {}
RestCursorHandler::~RestCursorHandler() {
if (_leasedCursor) {
auto cursors = _vocbase.cursorRepository();
TRI_ASSERT(cursors != nullptr);
cursors->release(_leasedCursor);
}
}
RestStatus RestCursorHandler::execute() {
// extract the sub-request type
@ -74,9 +81,24 @@ RestStatus RestCursorHandler::continueExecute() {
// extract the sub-request type
rest::RequestType const type = _request->requestType();
if (type == rest::RequestType::POST) {
if (_query != nullptr) { // non-stream query
if (type == rest::RequestType::POST ||
type == rest::RequestType::PUT) {
return processQuery();
}
} else if (_leasedCursor) { // stream cursor query
Cursor* cs = _leasedCursor;
_leasedCursor = nullptr;
if (type == rest::RequestType::POST) {
return generateCursorResult(rest::ResponseCode::CREATED, cs);
} else if (type == rest::RequestType::PUT) {
if (_request->requestPath() == SIMPLE_QUERY_ALL_PATH) {
// RestSimpleQueryHandler::allDocuments uses PUT for cursor creation
return generateCursorResult(ResponseCode::CREATED, cs);
}
return generateCursorResult(ResponseCode::OK, cs);
}
}
// Other parts of the query cannot be paused
TRI_ASSERT(false);
@ -97,17 +119,17 @@ bool RestCursorHandler::cancel() {
/// If false we are done (error or stream)
////////////////////////////////////////////////////////////////////////////////
bool RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
RestStatus RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
TRI_ASSERT(_query == nullptr);
if (!slice.isObject()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_QUERY_EMPTY);
return false;
return RestStatus::DONE;
}
VPackSlice const querySlice = slice.get("query");
if (!querySlice.isString() || querySlice.getStringLength() == 0) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_QUERY_EMPTY);
return false;
return RestStatus::DONE;
}
VPackSlice const bindVars = slice.get("bindVars");
@ -115,7 +137,7 @@ bool RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
if (!bindVars.isObject() && !bindVars.isNull()) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_TYPE_ERROR,
"expecting object for <bindVars>");
return false;
return RestStatus::DONE;
}
}
@ -140,17 +162,19 @@ bool RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
if (stream) {
if (count) {
generateError(Result(TRI_ERROR_BAD_PARAMETER, "cannot use 'count' option for a streaming query"));
return RestStatus::DONE;
} else {
CursorRepository* cursors = _vocbase.cursorRepository();
TRI_ASSERT(cursors != nullptr);
Cursor* cursor = cursors->createQueryStream(
querySlice.copyString(), bindVarsBuilder, _options, batchSize, ttl);
generateCursorResult(rest::ResponseCode::CREATED, cursor);
return generateCursorResult(rest::ResponseCode::CREATED, cursor);
}
return false; // done
}
// non-stream case. Execute query, then build a cursor
// with the entire result set.
VPackValueLength l;
char const* queryStr = querySlice.getString(l);
TRI_ASSERT(l > 0);
@ -171,7 +195,7 @@ bool RestCursorHandler::registerQueryOrCursor(VPackSlice const& slice) {
});
registerQuery(std::move(query));
return true;
return processQuery();
}
//////////////////////////////////////////////////////////////////////////////
@ -191,6 +215,7 @@ RestStatus RestCursorHandler::processQuery() {
unregisterQuery();
});
// continue handler is registered earlier
auto state = _query->execute(_queryRegistry, _queryResult);
if (state == aql::ExecutionState::WAITING) {
guard.cancel();
@ -200,11 +225,10 @@ RestStatus RestCursorHandler::processQuery() {
}
// We cannot get into HASMORE here, or we would lose results.
handleQueryResult();
return RestStatus::DONE;
return handleQueryResult();
}
void RestCursorHandler::handleQueryResult() {
RestStatus RestCursorHandler::handleQueryResult() {
if (_queryResult.code != TRI_ERROR_NO_ERROR) {
if (_queryResult.code == TRI_ERROR_REQUEST_CANCELED ||
(_queryResult.code == TRI_ERROR_QUERY_KILLED && wasCanceled())) {
@ -278,16 +302,16 @@ void RestCursorHandler::handleQueryResult() {
}
generateResult(rest::ResponseCode::CREATED, std::move(buffer),
_queryResult.context);
return RestStatus::DONE;
} else {
// result is bigger than batchSize, and a cursor will be created
CursorRepository* cursors = _vocbase.cursorRepository();
TRI_ASSERT(cursors != nullptr);
// result is bigger than batchSize, and a cursor will be created
TRI_ASSERT(_queryResult.result.get() != nullptr);
// steal the query result, cursor will take over the ownership
Cursor* cursor = cursors->createFromQueryResult(std::move(_queryResult),
batchSize, ttl, count);
generateCursorResult(rest::ResponseCode::CREATED, cursor);
return generateCursorResult(rest::ResponseCode::CREATED, cursor);
}
}
@ -440,7 +464,7 @@ void RestCursorHandler::buildOptions(VPackSlice const& slice) {
/// registry if required
//////////////////////////////////////////////////////////////////////////////
void RestCursorHandler::generateCursorResult(rest::ResponseCode code,
RestStatus RestCursorHandler::generateCursorResult(rest::ResponseCode code,
arangodb::Cursor* cursor) {
// always clean up
auto guard = scopeGuard([this, &cursor]() {
@ -453,14 +477,25 @@ void RestCursorHandler::generateCursorResult(rest::ResponseCode code,
std::shared_ptr<transaction::Context> ctx = cursor->context();
VPackBuffer<uint8_t> buffer;
VPackBuilder result(buffer);
result.openObject();
result.add(StaticStrings::Error, VPackValue(false));
result.add(StaticStrings::Code, VPackValue(static_cast<int>(code)));
// TODO maybe pull out the actual block fetching, so that this just builds
// the result and we may wait before.
Result r = cursor->dumpSync(result);
result.close();
VPackBuilder builder(buffer);
builder.openObject();
aql::ExecutionState state;
Result r;
auto self = shared_from_this();
std::tie(state, r) = cursor->dump(builder, [this, self]() {
continueHandlerExecution();
});
if (state == aql::ExecutionState::WAITING) {
builder.clear();
_leasedCursor = cursor;
guard.cancel();
return RestStatus::WAITING;
}
builder.add(StaticStrings::Error, VPackValue(false));
builder.add(StaticStrings::Code, VPackValue(static_cast<int>(code)));
builder.close();
if (r.ok()) {
_response->setContentType(rest::ContentType::JSON);
@ -468,6 +503,7 @@ void RestCursorHandler::generateCursorResult(rest::ResponseCode code,
} else {
generateError(r);
}
return RestStatus::DONE;
}
////////////////////////////////////////////////////////////////////////////////
@ -501,11 +537,7 @@ RestStatus RestCursorHandler::createQueryCursor() {
_isValidForFinalize = true;
TRI_ASSERT(_query == nullptr);
if (registerQueryOrCursor(body)) {
// We are in the non-streaming case
return processQuery();
}
return RestStatus::DONE;
return registerQueryOrCursor(body);
}
////////////////////////////////////////////////////////////////////////////////
@ -542,8 +574,7 @@ RestStatus RestCursorHandler::modifyQueryCursor() {
return RestStatus::DONE;
}
generateCursorResult(rest::ResponseCode::OK, cursor);
return RestStatus::DONE;
return generateCursorResult(rest::ResponseCode::OK, cursor);;
}
////////////////////////////////////////////////////////////////////////////////

View File

@ -78,7 +78,7 @@ class RestCursorHandler : public RestVocbaseBaseHandler {
/// this method is also used by derived classes
//////////////////////////////////////////////////////////////////////////////
bool registerQueryOrCursor(arangodb::velocypack::Slice const& body);
RestStatus registerQueryOrCursor(arangodb::velocypack::Slice const& body);
//////////////////////////////////////////////////////////////////////////////
/// @brief Process the query registered in _query.
@ -100,7 +100,7 @@ class RestCursorHandler : public RestVocbaseBaseHandler {
/// @brief handle the result returned by the query. This function is guaranteed
/// to not be interrupted and is guaranteed to get a complete queryResult.
//////////////////////////////////////////////////////////////////////////////
virtual void handleQueryResult();
virtual RestStatus handleQueryResult();
//////////////////////////////////////////////////////////////////////////////
/// @brief whether or not the query was canceled
@ -134,7 +134,7 @@ class RestCursorHandler : public RestVocbaseBaseHandler {
/// registry if required
//////////////////////////////////////////////////////////////////////////////
void generateCursorResult(rest::ResponseCode code, arangodb::Cursor*);
RestStatus generateCursorResult(rest::ResponseCode code, arangodb::Cursor*);
//////////////////////////////////////////////////////////////////////////////
/// @brief create a cursor and return the first results
@ -175,6 +175,11 @@ class RestCursorHandler : public RestVocbaseBaseHandler {
arangodb::aql::QueryRegistry* _queryRegistry;
//////////////////////////////////////////////////////////////////////////////
/// @brief leased query cursor, may be set by query continuation
//////////////////////////////////////////////////////////////////////////////
Cursor* _leasedCursor;
//////////////////////////////////////////////////////////////////////////////
/// @brief lock for currently running query
//////////////////////////////////////////////////////////////////////////////
@ -208,6 +213,7 @@ class RestCursorHandler : public RestVocbaseBaseHandler {
//////////////////////////////////////////////////////////////////////////////
std::shared_ptr<arangodb::velocypack::Builder> _options;
};
}

View File

@ -86,19 +86,6 @@ RestStatus RestSimpleHandler::execute() {
return RestStatus::DONE;
}
RestStatus RestSimpleHandler::continueExecute() {
// extract the sub-request type
rest::RequestType const type = _request->requestType();
if (type == rest::RequestType::PUT) {
return processQuery();
}
// Other parts of the query cannot be paused
TRI_ASSERT(false);
return RestStatus::DONE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock RestRemoveByKeys
////////////////////////////////////////////////////////////////////////////////
@ -178,22 +165,18 @@ RestStatus RestSimpleHandler::removeByKeys(VPackSlice const& slice) {
data.close(); // bindVars
data.close();
if (registerQueryOrCursor(data.slice())) {
return processQuery();
}
return RestStatus::DONE;
return registerQueryOrCursor(data.slice());
}
void RestSimpleHandler::handleQueryResult() {
RestStatus RestSimpleHandler::handleQueryResult() {
if (_queryResult.code != TRI_ERROR_NO_ERROR) {
if (_queryResult.code == TRI_ERROR_REQUEST_CANCELED ||
(_queryResult.code == TRI_ERROR_QUERY_KILLED && wasCanceled())) {
generateError(GeneralResponse::responseCode(TRI_ERROR_REQUEST_CANCELED), TRI_ERROR_REQUEST_CANCELED);
return;
}
} else {
generateError(GeneralResponse::responseCode(_queryResult.code), _queryResult.code, _queryResult.details);
return;
}
return RestStatus::DONE;
}
// extract the request type
@ -203,10 +186,10 @@ void RestSimpleHandler::handleQueryResult() {
if (type == rest::RequestType::PUT) {
if (prefix == RestVocbaseBaseHandler::SIMPLE_REMOVE_PATH) {
handleQueryResultRemoveByKeys();
return;
return RestStatus::DONE;
} else if (prefix == RestVocbaseBaseHandler::SIMPLE_LOOKUP_PATH) {
handleQueryResultLookupByKeys();
return;
return RestStatus::DONE;
}
}
@ -215,6 +198,7 @@ void RestSimpleHandler::handleQueryResult() {
TRI_ASSERT(false);
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED,
TRI_ERROR_HTTP_METHOD_NOT_ALLOWED);
return RestStatus::DONE;
}
void RestSimpleHandler::handleQueryResultRemoveByKeys() {
@ -323,8 +307,5 @@ RestStatus RestSimpleHandler::lookupByKeys(VPackSlice const& slice) {
data.close(); // bindVars
data.close();
if (registerQueryOrCursor(data.slice())) {
return processQuery();
}
return RestStatus::DONE;
return registerQueryOrCursor(data.slice());
}

View File

@ -40,7 +40,6 @@ class RestSimpleHandler : public RestCursorHandler {
public:
RestStatus execute() override final;
RestStatus continueExecute() override final;
char const* name() const override final { return "RestSimpleHandler"; }
private:
@ -50,7 +49,7 @@ class RestSimpleHandler : public RestCursorHandler {
/// to not be interrupted and is guaranteed to get a complete queryResult.
//////////////////////////////////////////////////////////////////////////////
void handleQueryResult() override final;
RestStatus handleQueryResult() override final;
//////////////////////////////////////////////////////////////////////////////
/// @brief handle result of a remove-by-keys query

View File

@ -67,20 +67,6 @@ RestStatus RestSimpleQueryHandler::execute() {
return RestStatus::DONE;
}
RestStatus RestSimpleQueryHandler::continueExecute() {
// extract the sub-request type
rest::RequestType const type = _request->requestType();
if (type == rest::RequestType::PUT) {
return processQuery();
}
// NOT YET IMPLEMENTED
TRI_ASSERT(false);
return RestStatus::DONE;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief was docuBlock JSA_put_api_simple_all
////////////////////////////////////////////////////////////////////////////////
@ -168,10 +154,7 @@ RestStatus RestSimpleQueryHandler::allDocuments() {
data.close();
// now run the actual query and handle the result
if (registerQueryOrCursor(data.slice())) {
return processQuery();
}
return RestStatus::DONE;
return registerQueryOrCursor(data.slice());
}
//////////////////////////////////////////////////////////////////////////////
@ -235,15 +218,9 @@ RestStatus RestSimpleQueryHandler::allDocumentKeys() {
data.openObject(); // bindVars
data.add("@collection", VPackValue(collectionName));
data.close(); // bindVars
data.close();
if (registerQueryOrCursor(data.slice())) {
// We do not support streaming here!
// now run the actual query and handle the result
return processQuery();
}
return RestStatus::DONE;
return registerQueryOrCursor(data.slice());
}
static void buildExampleQuery(VPackBuilder& result,
@ -334,8 +311,5 @@ RestStatus RestSimpleQueryHandler::byExample() {
data.add("count", VPackSlice::trueSlice());
data.close();
if (registerQueryOrCursor(data.slice())) {
return processQuery();
}
return RestStatus::DONE;
return registerQueryOrCursor(data.slice());
}

View File

@ -39,7 +39,6 @@ class RestSimpleQueryHandler : public RestCursorHandler {
public:
RestStatus execute() override final;
RestStatus continueExecute() override final;
char const* name() const override final { return "RestSimpleQueryHandler"; }
private:

View File

@ -228,21 +228,5 @@ RestStatus RocksDBRestExportHandler::createCursor() {
VPackBuilder queryBody = buildQueryOptions(name, body);
TRI_ASSERT(_query == nullptr);
if (registerQueryOrCursor(queryBody.slice())) {
return processQuery();
}
return RestStatus::DONE;
}
RestStatus RocksDBRestExportHandler::continueExecute() {
// extract the sub-request type
rest::RequestType const type = _request->requestType();
if (type == rest::RequestType::POST) {
return processQuery();
}
// NOT YET IMPLEMENTED
TRI_ASSERT(false);
return RestStatus::DONE;
return registerQueryOrCursor(queryBody.slice());
}

View File

@ -40,7 +40,6 @@ class RocksDBRestExportHandler : public RestCursorHandler {
public:
RestStatus execute() override;
RestStatus continueExecute() override;
private:
//////////////////////////////////////////////////////////////////////////////

View File

@ -108,7 +108,7 @@ class Cursor {
* Second: Result If State==DONE this contains Error information or NO_ERROR. On NO_ERROR result is filled.
*/
virtual std::pair<aql::ExecutionState, Result> dump(
velocypack::Builder& result, std::function<void()>& continueHandler) = 0;
velocypack::Builder& result, std::function<void()> const&) = 0;
/**
* @brief Dump the cursor result. This is guaranteed to return the result in this thread.