1
0
Fork 0

Fixed 2.8 dump compatibility and fixed a few more HTTP replication tests.

This commit is contained in:
Dan Larkin 2017-04-28 16:50:39 -04:00
parent e6611ddc4f
commit b46ac15300
5 changed files with 84 additions and 111 deletions

View File

@ -985,32 +985,11 @@ describe ArangoDB do
cmd = api + "/dump?collection=UnitTestsReplication&batchId=#{@batchId}"
doc = ArangoDB.log_get("#{prefix}-deleted", cmd, :body => "", :format => :plain)
doc.code.should eq(200)
doc.code.should eq(204)
doc.headers["x-arango-replication-checkmore"].should eq("false")
doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/)
doc.headers["x-arango-replication-lastincluded"].should_not eq("0")
doc.headers["x-arango-replication-lastincluded"].should eq("0")
doc.headers["content-type"].should eq("application/x-arango-dump; charset=utf-8")
body = doc.response.body
i = 0
while 1
position = body.index("\n")
break if position == nil
part = body.slice(0, position)
document = JSON.parse(part)
document['type'].should eq(2302)
document['data']['_key'].should match(/^test[0-9]+$/)
body = body.slice(position + 1, body.length)
i = i + 1
end
i.should eq(10)
end
it "checks the dump for a truncated collection" do
@ -1037,34 +1016,11 @@ describe ArangoDB do
cmd = api + "/dump?collection=UnitTestsReplication&batchId=#{@batchId}"
doc = ArangoDB.log_get("#{prefix}-truncated", cmd, :body => "", :format => :plain)
doc.code.should eq(200)
doc.code.should eq(204)
doc.headers["x-arango-replication-checkmore"].should eq("false")
doc.headers["x-arango-replication-lastincluded"].should match(/^\d+$/)
doc.headers["x-arango-replication-lastincluded"].should_not eq("0")
doc.headers["x-arango-replication-lastincluded"].should eq("0")
doc.headers["content-type"].should eq("application/x-arango-dump; charset=utf-8")
body = doc.response.body
i = 0
while 1
position = body.index("\n")
break if position == nil
part = body.slice(0, position)
document = JSON.parse(part)
document['type'].should eq(2302)
# truncate order is undefined
document['data']['_key'].should match(/^test\d+$/)
document['data']['_rev'].should match(/^[a-zA-Z0-9_\-]+$/)
body = body.slice(position + 1, body.length)
i = i + 1
end
i.should eq(10)
end
it "checks the dump for a non-empty collection, 3.0 mode" do

View File

@ -26,7 +26,6 @@
#include "Basics/StringBuffer.h"
#include "Basics/StringRef.h"
#include "Basics/VPackStringBufferAdapter.h"
#include "VocBase/replication-common.h"
#include "RocksDBEngine/RocksDBCollection.h"
#include "RocksDBEngine/RocksDBCommon.h"
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
@ -130,7 +129,7 @@ RocksDBReplicationContext::getInventory(TRI_vocbase_t* vocbase,
// creating a new iterator if one does not exist for this collection
RocksDBReplicationResult RocksDBReplicationContext::dump(
TRI_vocbase_t* vocbase, std::string const& collectionName,
basics::StringBuffer& buff, uint64_t chunkSize) {
basics::StringBuffer& buff, uint64_t chunkSize, bool compat28) {
TRI_ASSERT(vocbase != nullptr);
if (_trx.get() == nullptr) {
return RocksDBReplicationResult(TRI_ERROR_BAD_PARAMETER, _lastTick);
@ -142,12 +141,15 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
// set type
int type = REPLICATION_MARKER_DOCUMENT; // documents
if (compat28 && (_collection->type() == TRI_COL_TYPE_EDGE)) {
type = 2301; // 2.8 compatibility edges
}
arangodb::basics::VPackStringBufferAdapter adapter(buff.stringBuffer());
VPackBuilder builder(&_vpackOptions);
auto cb = [this, &type, &buff, &adapter,
auto cb = [this, &type, &buff, &adapter, &compat28,
&builder](DocumentIdentifierToken const& token) {
builder.clear();
@ -167,7 +169,9 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
builder.add(VPackValue("data"));
auto key = VPackSlice(_mdr.vpack()).get(StaticStrings::KeyString);
_mdr.addToBuilder(builder, false);
builder.add("key", key);
if (compat28) {
builder.add("key", key);
}
builder.close();
VPackDumper dumper(
@ -180,7 +184,7 @@ RocksDBReplicationResult RocksDBReplicationContext::dump(
while (_hasMore && buff.length() < chunkSize) {
try {
_hasMore = _iter->next(cb, 10); // TODO: adjust limit?
_hasMore = _iter->next(cb, 1); // TODO: adjust limit?
} catch (std::exception const& ex) {
_hasMore = false;
return RocksDBReplicationResult(TRI_ERROR_INTERNAL, _lastTick);

View File

@ -54,7 +54,7 @@ class RocksDBReplicationContext {
TRI_voc_tick_t id() const;
uint64_t lastTick() const;
uint64_t count() const;
TRI_vocbase_t* vocbase() const {
if (_trx == nullptr) {
return nullptr;
@ -74,7 +74,8 @@ class RocksDBReplicationContext {
// creating a new iterator if one does not exist for this collection
RocksDBReplicationResult dump(TRI_vocbase_t* vocbase,
std::string const& collectionName,
basics::StringBuffer&, uint64_t chunkSize);
basics::StringBuffer&, uint64_t chunkSize,
bool compat28);
// iterates over all documents in a collection, previously bound with
// bindCollection. Generates array of objects with minKey, maxKey and hash

View File

@ -80,17 +80,17 @@ class WALParser : public rocksdb::WriteBatch::Handler {
public:
explicit WALParser(TRI_vocbase_t* vocbase, uint64_t from, size_t& limit,
bool includeSystem, VPackBuilder& builder)
: _vocbase(vocbase),
_from(from),
_limit(limit),
_includeSystem(includeSystem),
_builder(builder) {}
: _vocbase(vocbase),
_from(from),
_limit(limit),
_includeSystem(includeSystem),
_builder(builder) {}
void LogData(rocksdb::Slice const& blob) override {
if (_currentSequence < _from) {
return;
}
RocksDBLogType type = RocksDBLogValue::type(blob);
TRI_DEFER(_lastLogType = type);
switch (type) {
@ -139,9 +139,8 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_currentCollectionId = RocksDBLogValue::collectionId(blob);
TRI_idx_iid_t iid = RocksDBLogValue::indexId(blob);
_builder.openObject();
_builder.add(
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_INDEX_DROP)));
_builder.add("type",
VPackValue(static_cast<uint64_t>(REPLICATION_INDEX_DROP)));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
_builder.add("data", VPackValue(VPackValueType::Object));
@ -221,6 +220,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
_builder.add("type", VPackValue(convertLogType(_lastLogType)));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
_builder.add("cname", RocksDBValue::data(value).get("name"));
if (_lastLogType == RocksDBLogType::CollectionRename) {
VPackSlice collectionData(value.data());
@ -273,7 +273,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
if (_currentSequence < _from) {
return;
}
switch (RocksDBKey::type(key)) {
case RocksDBEntryType::Collection: {
TRI_ASSERT(_lastLogType == RocksDBLogType::CollectionDrop);
@ -302,13 +302,13 @@ class WALParser : public rocksdb::WriteBatch::Handler {
TRI_ASSERT(!_seenBeginTransaction || _currentTrxId != 0);
TRI_ASSERT(_currentDbId != 0 && _currentCollectionId != 0);
TRI_ASSERT(!_removeDocumentKey.empty());
uint64_t revisionId = RocksDBKey::revisionId(key);
_builder.openObject();
_builder.add("tick", VPackValue(std::to_string(_currentSequence)));
_builder.add(
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_REMOVE)));
"type",
VPackValue(static_cast<uint64_t>(REPLICATION_MARKER_REMOVE)));
_builder.add("database", VPackValue(std::to_string(_currentDbId)));
_builder.add("cid", VPackValue(std::to_string(_currentCollectionId)));
if (_singleOpTransaction) { // single op is defined to 0
@ -327,7 +327,7 @@ class WALParser : public rocksdb::WriteBatch::Handler {
default:
break; // shouldn't get here?
}
if (_limit > 0) {
_limit--;
}
@ -425,7 +425,6 @@ RocksDBReplicationResult rocksutils::tailWal(TRI_vocbase_t* vocbase,
uint64_t from, size_t limit,
bool includeSystem,
VPackBuilder& builder) {
uint64_t lastTick = from;
std::unique_ptr<WALParser> handler(
new WALParser(vocbase, from, limit, includeSystem, builder));

View File

@ -26,8 +26,8 @@
#include "ApplicationFeatures/ApplicationServer.h"
#include "Basics/ConditionLocker.h"
#include "Basics/ReadLocker.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/VPackStringBufferAdapter.h"
#include "Basics/VelocyPackHelper.h"
#include "Basics/conversions.h"
#include "Basics/files.h"
#include "Cluster/ClusterComm.h"
@ -580,86 +580,87 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
if (_request->transportType() == Endpoint::TransportType::VPP) {
useVpp = true;
}
// determine start and end tick
TRI_voc_tick_t tickStart = 0;
TRI_voc_tick_t tickEnd = UINT64_MAX;
bool found;
std::string const& value1 = _request->value("from", found);
if (found) {
tickStart = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value1));
}
// determine end tick for dump
std::string const& value2 = _request->value("to", found);
if (found) {
tickEnd = static_cast<TRI_voc_tick_t>(StringUtils::uint64(value2));
}
if (found && (tickStart > tickEnd || tickEnd == 0)) {
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
"invalid from/to values");
return;
}
bool includeSystem = true;
std::string const& value4 = _request->value("includeSystem", found);
if (found) {
includeSystem = StringUtils::boolean(value4);
}
size_t limit = 10000; // TODO: determine good default value?
std::string const& value5 = _request->value("chunkSize", found);
if (found) {
limit = static_cast<size_t>(StringUtils::uint64(value5));
}
std::shared_ptr<transaction::Context> transactionContext =
transaction::StandaloneContext::Create(_vocbase);
transaction::StandaloneContext::Create(_vocbase);
VPackBuilder builder(transactionContext->getVPackOptions());
builder.openArray();
auto result = tailWal(_vocbase, tickStart, limit, includeSystem, builder);
builder.close();
auto data = builder.slice();
if (result.fail()) {
generateError(rest::ResponseCode::SERVER_ERROR, result.errorNumber(), result.errorMessage());
generateError(rest::ResponseCode::SERVER_ERROR, result.errorNumber(),
result.errorMessage());
return;
}
bool const checkMore =
(result.maxTick() > 0 && result.maxTick() < latestSequenceNumber());
(result.maxTick() > 0 && result.maxTick() < latestSequenceNumber());
// generate the result
size_t length = data.length();
if (length == 0) {
resetResponse(rest::ResponseCode::NO_CONTENT);
} else {
resetResponse(rest::ResponseCode::OK);
}
// transfer ownership of the buffer contents
_response->setContentType(rest::ContentType::DUMP);
// set headers
_response->setHeaderNC(TRI_REPLICATION_HEADER_CHECKMORE,
checkMore ? "true" : "false");
_response->setHeaderNC(
TRI_REPLICATION_HEADER_LASTINCLUDED,
StringUtils::itoa((length == 0) ? 0 : result.maxTick()));
TRI_REPLICATION_HEADER_LASTINCLUDED,
StringUtils::itoa((length == 0) ? 0 : result.maxTick()));
_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK,
StringUtils::itoa(latestSequenceNumber()));
_response->setHeaderNC(TRI_REPLICATION_HEADER_ACTIVE, "true");
_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT,
result.fromTickIncluded() ? "true" : "false");
if (length > 0) {
if (useVpp) {
for (auto message : arangodb::velocypack::ArrayIterator(data)) {
@ -667,18 +668,19 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
transactionContext->getVPackOptions(), true);
}
} else {
HttpResponse* httpResponse =
dynamic_cast<HttpResponse*>(_response.get());
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
if (httpResponse == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"invalid response type");
}
basics::StringBuffer& buffer = httpResponse->body();
arangodb::basics::VPackStringBufferAdapter adapter(buffer.stringBuffer());
VPackDumper dumper(&adapter,
transactionContext->getVPackOptions()); // note: we need the CustomTypeHandler here
VPackDumper dumper(
&adapter,
transactionContext
->getVPackOptions()); // note: we need the CustomTypeHandler here
for (auto marker : arangodb::velocypack::ArrayIterator(data)) {
dumper.dump(marker);
httpResponse->body().appendChar('\n');
@ -687,10 +689,10 @@ void RocksDBRestReplicationHandler::handleCommandLoggerFollow() {
// add client
bool found;
std::string const& value = _request->value("serverId", found);
if (found) {
TRI_server_id_t serverId = (TRI_server_id_t)StringUtils::uint64(value);
if (serverId > 0) {
_vocbase->updateReplicationClient(serverId, result.maxTick());
}
@ -710,17 +712,18 @@ void RocksDBRestReplicationHandler::handleCommandDetermineOpenTransactions() {
if (_request->transportType() == Endpoint::TransportType::VPP) {
useVpp = true;
}
//_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, StringUtils::itoa(dump._lastFoundTick));
//_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK,
// StringUtils::itoa(dump._lastFoundTick));
_response->setHeaderNC(TRI_REPLICATION_HEADER_LASTTICK, "0");
_response->setContentType(rest::ContentType::DUMP);
//_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, dump._fromTickIncluded ? "true" : "false");
//_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT,
// dump._fromTickIncluded ? "true" : "false");
_response->setHeaderNC(TRI_REPLICATION_HEADER_FROMPRESENT, "true");
VPackSlice slice = VelocyPackHelper::EmptyArrayValue();
if (useVpp) {
_response->addPayload(slice, &VPackOptions::Defaults, false);
} else {
HttpResponse* httpResponse =
dynamic_cast<HttpResponse*>(_response.get());
HttpResponse* httpResponse = dynamic_cast<HttpResponse*>(_response.get());
if (httpResponse == nullptr) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
@ -789,9 +792,11 @@ void RocksDBRestReplicationHandler::handleCommandInventory() {
builder.add("running", VPackValue(true));
builder.add("lastLogTick", VPackValue(std::to_string(ctx->lastTick())));
builder.add("lastUncommittedLogTick",
VPackValue(std::to_string(ctx->lastTick()))); // s.lastAssignedTick
builder.add("totalEvents", VPackValue(ctx->lastTick())); // s.numEvents + s.numEventsSync
builder.add(
"lastUncommittedLogTick",
VPackValue(std::to_string(ctx->lastTick()))); // s.lastAssignedTick
builder.add("totalEvents",
VPackValue(ctx->lastTick())); // s.numEvents + s.numEventsSync
builder.add("time", VPackValue(utilities::timeString()));
builder.close(); // state
@ -1285,6 +1290,13 @@ void RocksDBRestReplicationHandler::handleCommandDump() {
return;
}
bool compat28 = false;
std::string const& value8 = _request->value("compat28", found);
if (found) {
compat28 = StringUtils::boolean(value8);
}
// print request
LOG_TOPIC(TRACE, arangodb::Logger::FIXME)
<< "requested collection dump for collection '" << collection
@ -1299,7 +1311,8 @@ void RocksDBRestReplicationHandler::handleCommandDump() {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid response type");
}
auto result = context->dump(_vocbase, collection, dump, determineChunkSize());
auto result =
context->dump(_vocbase, collection, dump, determineChunkSize(), compat28);
// generate the result
if (dump.length() == 0) {