1
0
Fork 0

Bug fix/input row serialization (#10502)

* Unified toVelocyPack of InputRow and AqlItemBlock. We can now velocypack any range within an AqlItemBlock as a new block.

* This is the fix required here. We now end exhausted false in initialize cursor. For some reason the other side checks this hard-coded value and if it is not the expected value it does ignore the send body ¯\_(ツ)_/¯

* Added test for range-based serialization
This commit is contained in:
Michael Hackstein 2019-11-22 08:08:53 +01:00 committed by GitHub
parent b94f40d890
commit c201dce948
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 167 additions and 185 deletions

View File

@ -535,6 +535,13 @@ SharedAqlItemBlockPtr AqlItemBlock::steal(std::vector<size_t> const& chosen,
return res;
}
/// @brief toJson, transfer all rows of this AqlItemBlock to Json, the result
/// can be used to recreate the AqlItemBlock via the Json constructor
void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions,
VPackBuilder& result) const {
return toVelocyPack(0, size(), trxOptions, result);
}
/// @brief toJson, transfer a whole AqlItemBlock to Json, the result can
/// be used to recreate the AqlItemBlock via the Json constructor
/// Here is a description of the data format: The resulting Json has
@ -568,7 +575,15 @@ SharedAqlItemBlockPtr AqlItemBlock::steal(std::vector<size_t> const& chosen,
/// corresponding position
/// "raw": List of actual values, positions 0 and 1 are always null
/// such that actual indices start at 2
void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPackBuilder& result) const {
void AqlItemBlock::toVelocyPack(size_t from, size_t to,
velocypack::Options const* const trxOptions,
VPackBuilder& result) const {
// Can only have positive slice size
TRI_ASSERT(from < to);
// We cannot slice over the upper bound.
// The lower bound (0) is protected by unsigned number type
TRI_ASSERT(to <= _nrItems);
TRI_ASSERT(result.isOpenObject());
VPackOptions options(VPackOptions::Defaults);
options.buildUnindexedArrays = true;
@ -580,7 +595,7 @@ void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPa
raw.add(VPackValue(VPackValueType::Null));
raw.add(VPackValue(VPackValueType::Null));
result.add("nrItems", VPackValue(_nrItems));
result.add("nrItems", VPackValue(to - from));
result.add("nrRegs", VPackValue(_nrRegs));
result.add(StaticStrings::Error, VPackValue(false));
@ -639,7 +654,7 @@ void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPa
startRegister = 1;
}
for (RegisterId column = startRegister; column < internalNrRegs(); column++) {
for (size_t i = 0; i < _nrItems; i++) {
for (size_t i = from; i < to; i++) {
AqlValue const& a(_data[i * internalNrRegs() + column]);
// determine current state
@ -701,7 +716,8 @@ void AqlItemBlock::toVelocyPack(velocypack::Options const* const trxOptions, VPa
result.add("raw", raw.slice());
}
void AqlItemBlock::rowToSimpleVPack(size_t const row, velocypack::Options const* options, arangodb::velocypack::Builder& builder) const {
void AqlItemBlock::rowToSimpleVPack(size_t const row, velocypack::Options const* options,
arangodb::velocypack::Builder& builder) const {
VPackArrayBuilder rowBuilder{&builder};
if (isShadowRow(row)) {

View File

@ -206,10 +206,18 @@ class AqlItemBlock {
/// to which our AqlValues point will vanish.
SharedAqlItemBlockPtr steal(std::vector<size_t> const& chosen, size_t from, size_t to);
/// @brief toJson, transfer a whole AqlItemBlock to Json, the result can
/// be used to recreate the AqlItemBlock via the Json constructor
/// @brief toJson, transfer all rows of this AqlItemBlock to Json, the result
/// can be used to recreate the AqlItemBlock via the Json constructor
void toVelocyPack(velocypack::Options const*, arangodb::velocypack::Builder&) const;
/// @brief toJson, transfer a slice of this AqlItemBlock to Json, the result can
/// be used to recreate the AqlItemBlock via the Json constructor
/// The slice will be starting at line `from` (including) and end at line `to` (excluding).
/// Only calls with 0 <= from < to <= this.size() are allowed.
/// If you want to transfer the full block, use from == 0, to == this.size()
void toVelocyPack(size_t from, size_t to, velocypack::Options const*,
arangodb::velocypack::Builder&) const;
/// @brief Creates a human-readable velocypack of the block. Adds an object
/// `{nrItems, nrRegs, matrix}` to the builder.
///

View File

@ -119,135 +119,17 @@ SharedAqlItemBlockPtr InputAqlItemRow::cloneToBlock(AqlItemBlockManager& manager
/// corresponding position
/// "raw": List of actual values, positions 0 and 1 are always null
/// such that actual indices start at 2
void InputAqlItemRow::toVelocyPack(velocypack::Options const* const trxOptions, VPackBuilder& result) const {
void InputAqlItemRow::toVelocyPack(velocypack::Options const* const trxOptions,
VPackBuilder& result) const {
TRI_ASSERT(isInitialized());
TRI_ASSERT(result.isOpenObject());
VPackOptions options(VPackOptions::Defaults);
options.buildUnindexedArrays = true;
options.buildUnindexedObjects = true;
_block->toVelocyPack(_baseIndex, _baseIndex + 1, trxOptions, result);
}
VPackBuilder raw(&options);
raw.openArray();
// Two nulls in the beginning such that indices start with 2
raw.add(VPackValue(VPackValueType::Null));
raw.add(VPackValue(VPackValueType::Null));
result.add("nrItems", VPackValue(1));
result.add("nrRegs", VPackValue(getNrRegisters()));
result.add(StaticStrings::Error, VPackValue(false));
enum State {
Empty, // saw an empty value
Range, // saw a range value
Next, // saw a previously unknown value
Positional, // saw a value previously encountered
};
std::unordered_map<AqlValue, size_t> table; // remember duplicates
size_t lastTablePos = 0;
State lastState = Positional;
State currentState = Positional;
size_t runLength = 0;
size_t tablePos = 0;
result.add("data", VPackValue(VPackValueType::Array));
// write out data buffered for repeated "empty" or "next" values
auto writeBuffered = [](State lastState, size_t lastTablePos,
VPackBuilder& result, size_t runLength) {
if (lastState == Range) {
return;
}
if (lastState == Positional) {
if (lastTablePos >= 2) {
if (runLength == 1) {
result.add(VPackValue(lastTablePos));
} else {
result.add(VPackValue(-4));
result.add(VPackValue(runLength));
result.add(VPackValue(lastTablePos));
}
}
} else {
TRI_ASSERT(lastState == Empty || lastState == Next);
if (runLength == 1) {
// saw exactly one value
result.add(VPackValue(lastState == Empty ? 0 : 1));
} else {
// saw multiple values
result.add(VPackValue(lastState == Empty ? -1 : -3));
result.add(VPackValue(runLength));
}
}
};
size_t pos = 2; // write position in raw
// We use column == 0 to simulate a shadowRow
// this is only relevant if all participants can use shadow rows
RegisterId startRegister = 0;
if (block().getFormatType() == SerializationFormat::CLASSIC) {
// Skip over the shadowRows
startRegister = 1;
}
for (RegisterId column = startRegister; column < getNrRegisters() + 1; column++) {
AqlValue const& a = column == 0 ? AqlValue{} : getValue(column - 1);
// determine current state
if (a.isEmpty()) {
currentState = Empty;
} else if (a.isRange()) {
currentState = Range;
} else {
auto it = table.find(a);
if (it == table.end()) {
currentState = Next;
a.toVelocyPack(trxOptions, raw, false);
table.try_emplace(a, pos++);
} else {
currentState = Positional;
tablePos = it->second;
TRI_ASSERT(tablePos >= 2);
if (lastState != Positional) {
lastTablePos = tablePos;
}
}
}
// handle state change
if (currentState != lastState || (currentState == Positional && tablePos != lastTablePos)) {
// write out remaining buffered data in case of a state change
writeBuffered(lastState, lastTablePos, result, runLength);
lastTablePos = 0;
lastState = currentState;
runLength = 0;
}
switch (currentState) {
case Empty:
case Next:
case Positional:
++runLength;
lastTablePos = tablePos;
break;
case Range:
result.add(VPackValue(-2));
result.add(VPackValue(a.range()->_low));
result.add(VPackValue(a.range()->_high));
break;
}
}
// write out any remaining buffered data
writeBuffered(lastState, lastTablePos, result, runLength);
result.close(); // closes "data"
raw.close();
result.add("raw", raw.slice());
void InputAqlItemRow::toSimpleVelocyPack(velocypack::Options const* trxOpts,
arangodb::velocypack::Builder& result) const {
TRI_ASSERT(_block != nullptr);
_block->rowToSimpleVPack(_baseIndex, trxOpts, result);
}
InputAqlItemRow::InputAqlItemRow(SharedAqlItemBlockPtr const& block, size_t baseIndex)
@ -280,7 +162,9 @@ AqlValue InputAqlItemRow::stealValue(RegisterId registerId) {
return a;
}
RegisterCount InputAqlItemRow::getNrRegisters() const noexcept { return block().getNrRegs(); }
RegisterCount InputAqlItemRow::getNrRegisters() const noexcept {
return block().getNrRegs();
}
bool InputAqlItemRow::operator==(InputAqlItemRow const& other) const noexcept {
return this->_block == other._block && this->_baseIndex == other._baseIndex;

View File

@ -36,7 +36,7 @@ namespace arangodb {
namespace velocypack {
class Builder;
struct Options;
}
} // namespace velocypack
namespace aql {
class AqlItemBlock;
@ -133,6 +133,8 @@ class InputAqlItemRow {
/// Uses the same API as an AqlItemBlock with only a single row
void toVelocyPack(velocypack::Options const*, arangodb::velocypack::Builder&) const;
void toSimpleVelocyPack(velocypack::Options const*, arangodb::velocypack::Builder&) const;
private:
AqlItemBlock& block() noexcept;

View File

@ -83,7 +83,6 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecut
}
std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecutor>::getSomeWithoutTrace(size_t atMost) {
// silence tests -- we need to introduce new failure tests for fetchers
TRI_IF_FAILURE("ExecutionBlock::getOrSkipSome1") {
THROW_ARANGO_EXCEPTION(TRI_ERROR_DEBUG);
@ -98,9 +97,9 @@ std::pair<ExecutionState, SharedAqlItemBlockPtr> ExecutionBlockImpl<RemoteExecut
if (getQuery().killed()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
}
std::unique_lock<std::mutex> guard(_communicationMutex);
if (_requestInFlight) {
// Already sent a shutdown request, but haven't got an answer yet.
return {ExecutionState::WAITING, nullptr};
@ -167,14 +166,13 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSome(s
}
std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSomeWithoutTrace(size_t atMost) {
std::unique_lock<std::mutex> guard(_communicationMutex);
if (_requestInFlight) {
// Already sent a shutdown request, but haven't got an answer yet.
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
}
if (_lastError.fail()) {
TRI_ASSERT(_lastResponse == nullptr);
Result res = _lastError;
@ -182,7 +180,7 @@ std::pair<ExecutionState, size_t> ExecutionBlockImpl<RemoteExecutor>::skipSomeWi
// we were called with an error need to throw it.
THROW_ARANGO_EXCEPTION(res);
}
if (getQuery().killed()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
}
@ -255,13 +253,13 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
// will initialize the cursor lazily
return {ExecutionState::DONE, TRI_ERROR_NO_ERROR};
}
if (getQuery().killed()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_QUERY_KILLED);
}
std::unique_lock<std::mutex> guard(_communicationMutex);
if (_requestInFlight) {
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
}
@ -271,7 +269,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
auto response = std::move(_lastResponse);
// Result is the response which is an object containing the ErrorCode
int errorNumber = TRI_ERROR_INTERNAL; // default error code
int errorNumber = TRI_ERROR_INTERNAL; // default error code
VPackSlice slice = response->slice();
VPackSlice errorSlice = slice.get(StaticStrings::ErrorNum);
if (!errorSlice.isNumber()) {
@ -297,6 +295,8 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
VPackBuilder builder(buffer, &options);
builder.openObject(/*unindexed*/ true);
// Required for 3.5.* and earlier, dropped in 3.6.0
builder.add("exhausted", VPackValue(false));
// Used in 3.4.0 onwards
builder.add("done", VPackValue(false));
builder.add(StaticStrings::Code, VPackValue(TRI_ERROR_NO_ERROR));
@ -306,7 +306,8 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
builder.add("pos", VPackValue(0));
builder.add(VPackValue("items"));
builder.openObject(/*unindexed*/ true);
input.toVelocyPack(_engine->getQuery()->trx()->transactionContextPtr()->getVPackOptions(), builder);
input.toVelocyPack(_engine->getQuery()->trx()->transactionContextPtr()->getVPackOptions(),
builder);
builder.close();
builder.close();
@ -324,20 +325,19 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::initialize
/// @brief shutdown, will be called exactly once for the whole query
std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(int errorCode) {
// this should make the whole thing idempotent
if (!_isResponsibleForInitializeCursor) {
// do nothing...
return {ExecutionState::DONE, TRI_ERROR_NO_ERROR};
}
std::unique_lock<std::mutex> guard(_communicationMutex);
if (!_hasTriggeredShutdown) {
// skip request in progress
std::ignore = generateRequestTicket();
_hasTriggeredShutdown = true;
// For every call we simply forward via HTTP
VPackBuffer<uint8_t> buffer;
VPackBuilder builder(buffer);
@ -352,18 +352,18 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
if (!res.ok()) {
THROW_ARANGO_EXCEPTION(res);
}
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
}
if (_requestInFlight) {
// Already sent a shutdown request, but haven't got an answer yet.
return {ExecutionState::WAITING, TRI_ERROR_NO_ERROR};
}
if (_lastError.fail()) {
// _didReceiveShutdownRequest = true;
// _didReceiveShutdownRequest = true;
TRI_ASSERT(_lastResponse == nullptr);
Result res = std::move(_lastError);
_lastError.reset();
@ -385,7 +385,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
if (_lastResponse != nullptr) {
TRI_ASSERT(_lastError.ok());
auto response = std::move(_lastResponse);
// both must be reset before return or throw
@ -420,7 +420,7 @@ std::pair<ExecutionState, Result> ExecutionBlockImpl<RemoteExecutor>::shutdown(i
return {ExecutionState::DONE, TRI_ERROR_INTERNAL};
}
TRI_ASSERT(false);
return {ExecutionState::DONE, TRI_ERROR_NO_ERROR};
}
@ -442,7 +442,7 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
.append(spec.serverId)
.append("': ");
}
int res = TRI_ERROR_INTERNAL;
if (err != fuerte::Error::NoError) {
res = network::fuerteToArangoErrorCode(err);
@ -455,7 +455,8 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
res = VelocyPackHelper::getNumericValue(slice, StaticStrings::ErrorNum, res);
VPackStringRef ref =
VelocyPackHelper::getStringRef(slice, StaticStrings::ErrorMessage,
VPackStringRef("(no valid error in response)"));
VPackStringRef(
"(no valid error in response)"));
msg.append(ref.data(), ref.size());
}
}
@ -468,7 +469,6 @@ Result handleErrorResponse(network::EndpointSpec const& spec, fuerte::Error err,
Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb type,
std::string const& urlPart,
VPackBuffer<uint8_t>&& body) {
NetworkFeature const& nf =
_engine->getQuery()->vocbase().server().getFeature<NetworkFeature>();
network::ConnectionPool* pool = nf.pool();
@ -485,42 +485,39 @@ Result ExecutionBlockImpl<RemoteExecutor>::sendAsyncRequest(fuerte::RestVerb typ
TRI_ASSERT(!spec.endpoint.empty());
auto req = fuerte::createRequest(type, fuerte::ContentType::VPack);
req->header.database =_query.vocbase().name();
req->header.database = _query.vocbase().name();
req->header.path = urlPart + _queryId;
req->addVPack(std::move(body));
// Later, we probably want to set these sensibly:
req->timeout(kDefaultTimeOutSecs);
if (!_ownName.empty()) {
req->header.addMeta("Shard-Id", _ownName);
}
network::ConnectionPtr conn = pool->leaseConnection(spec.endpoint);
_requestInFlight = true;
auto ticket = generateRequestTicket();
conn->sendRequest(std::move(req),
[this, ticket, spec,
sqs = _query.sharedState()](fuerte::Error err,
std::unique_ptr<fuerte::Request> req,
std::unique_ptr<fuerte::Response> res) {
// `this` is only valid as long as sharedState is valid.
// So we must execute this under sharedState's mutex.
sqs->executeAndWakeup([&] {
std::lock_guard<std::mutex> guard(_communicationMutex);
if (_lastTicket == ticket) {
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
_lastError = handleErrorResponse(spec, err, res.get());
} else {
_lastResponse = std::move(res);
}
_requestInFlight = false;
return true;
}
return false;
});
});
conn->sendRequest(std::move(req), [this, ticket, spec, sqs = _query.sharedState()](
fuerte::Error err, std::unique_ptr<fuerte::Request> req,
std::unique_ptr<fuerte::Response> res) {
// `this` is only valid as long as sharedState is valid.
// So we must execute this under sharedState's mutex.
sqs->executeAndWakeup([&] {
std::lock_guard<std::mutex> guard(_communicationMutex);
if (_lastTicket == ticket) {
if (err != fuerte::Error::NoError || res->statusCode() >= 400) {
_lastError = handleErrorResponse(spec, err, res.get());
} else {
_lastResponse = std::move(res);
}
_requestInFlight = false;
return true;
}
return false;
});
});
++_engine->_stats.requests;
@ -559,6 +556,10 @@ void ExecutionBlockImpl<RemoteExecutor>::traceRequest(char const* const rpc,
LOG_TOPIC("92c71", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] remote request sent: " << rpc
<< (args.empty() ? "" : " ") << args << " registryId=" << remoteQueryId;
if (_profile >= PROFILE_LEVEL_TRACE_2) {
LOG_TOPIC("e0ae6", INFO, Logger::QUERIES)
<< "[query#" << queryId << "] data: " << slice.toJson();
}
}
}

View File

@ -351,6 +351,77 @@ TEST_F(AqlItemBlockTest, test_serialization_deserialization_slices) {
}
}
TEST_F(AqlItemBlockTest, test_serialization_deserialization_with_ranges) {
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 3, 2)};
block->emplaceValue(0, 0, dummyData(4));
block->emplaceValue(0, 1, dummyData(5));
block->emplaceValue(1, 0, dummyData(0));
block->emplaceValue(1, 1, dummyData(1));
block->emplaceValue(2, 0, dummyData(2));
block->emplaceValue(2, 1, dummyData(3));
{
// test range 0->1
VPackBuilder result;
result.openObject();
block->toVelocyPack(0, 1, nullptr, result);
ASSERT_TRUE(result.isOpenObject());
result.close();
SharedAqlItemBlockPtr testee = itemBlockManager.requestAndInitBlock(result.slice());
// Check exposed attributes
EXPECT_EQ(testee->size(), 1);
EXPECT_EQ(testee->getNrRegs(), block->getNrRegs());
// check data
compareWithDummy(testee, 0, 0, 4);
compareWithDummy(testee, 0, 1, 5);
assertShadowRowIndexes(testee, {});
}
{
// Test range 1->2
VPackBuilder result;
result.openObject();
block->toVelocyPack(1, 2, nullptr, result);
ASSERT_TRUE(result.isOpenObject());
result.close();
SharedAqlItemBlockPtr testee = itemBlockManager.requestAndInitBlock(result.slice());
// Check exposed attributes
EXPECT_EQ(testee->size(), 1);
EXPECT_EQ(testee->getNrRegs(), block->getNrRegs());
// check data
compareWithDummy(testee, 0, 0, 0);
compareWithDummy(testee, 0, 1, 1);
assertShadowRowIndexes(testee, {});
}
{
// Test range 0->2
VPackBuilder result;
result.openObject();
block->toVelocyPack(0, 2, nullptr, result);
ASSERT_TRUE(result.isOpenObject());
result.close();
SharedAqlItemBlockPtr testee = itemBlockManager.requestAndInitBlock(result.slice());
// Check exposed attributes
EXPECT_EQ(testee->size(), 2);
EXPECT_EQ(testee->getNrRegs(), block->getNrRegs());
// check data
compareWithDummy(testee, 0, 0, 4);
compareWithDummy(testee, 0, 1, 5);
compareWithDummy(testee, 1, 0, 0);
compareWithDummy(testee, 1, 1, 1);
assertShadowRowIndexes(testee, {});
}
}
TEST_F(AqlItemBlockTest, test_serialization_deserialization_input_row) {
SharedAqlItemBlockPtr block{new AqlItemBlock(itemBlockManager, 2, 2)};
block->emplaceValue(0, 0, dummyData(4));