mirror of https://gitee.com/bigwinds/arangodb
Fixed some bugs.
This commit is contained in:
parent
e0e00b0b6b
commit
2e0fdbf666
|
@ -42,17 +42,17 @@ EnumerateCollectionBlock::EnumerateCollectionBlock(
|
|||
: ExecutionBlock(engine, ep),
|
||||
_collection(ep->_collection),
|
||||
_mmdr(new ManagedDocumentResult),
|
||||
_cursor(_trx->indexScan(
|
||||
_collection->getName(),
|
||||
(ep->_random ? transaction::Methods::CursorType::ANY
|
||||
: transaction::Methods::CursorType::ALL),
|
||||
_mmdr.get(), 0, UINT64_MAX, 1000, false)),
|
||||
_cursor(
|
||||
_trx->indexScan(_collection->getName(),
|
||||
(ep->_random ? transaction::Methods::CursorType::ANY
|
||||
: transaction::Methods::CursorType::ALL),
|
||||
_mmdr.get(), 0, UINT64_MAX, 1000, false)),
|
||||
_mustStoreResult(true) {
|
||||
TRI_ASSERT(_cursor->successful());
|
||||
}
|
||||
|
||||
int EnumerateCollectionBlock::initialize() {
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
auto ep = static_cast<EnumerateCollectionNode const*>(_exeNode);
|
||||
_mustStoreResult = ep->isVarUsedLater(ep->_outVariable);
|
||||
|
||||
|
@ -60,9 +60,11 @@ int EnumerateCollectionBlock::initialize() {
|
|||
auto logicalCollection = _collection->getCollection();
|
||||
auto cid = logicalCollection->planId();
|
||||
auto dbName = logicalCollection->dbName();
|
||||
auto collectionInfoCurrent = ClusterInfo::instance()->getCollectionCurrent(dbName, std::to_string(cid));
|
||||
auto collectionInfoCurrent = ClusterInfo::instance()->getCollectionCurrent(
|
||||
dbName, std::to_string(cid));
|
||||
|
||||
double maxWait = _engine->getQuery()->getNumericOption<double>("satelliteSyncWait", 60.0);
|
||||
double maxWait = _engine->getQuery()->getNumericOption<double>(
|
||||
"satelliteSyncWait", 60.0);
|
||||
bool inSync = false;
|
||||
unsigned long waitInterval = 10000;
|
||||
double startTime = TRI_microtime();
|
||||
|
@ -71,7 +73,8 @@ int EnumerateCollectionBlock::initialize() {
|
|||
|
||||
while (!inSync) {
|
||||
auto followers = collectionInfoCurrent->servers(_collection->getName());
|
||||
inSync = std::find(followers.begin(), followers.end(), ServerState::instance()->getId()) != followers.end();
|
||||
inSync = std::find(followers.begin(), followers.end(),
|
||||
ServerState::instance()->getId()) != followers.end();
|
||||
if (!inSync) {
|
||||
if (endTime - now < waitInterval) {
|
||||
waitInterval = static_cast<unsigned long>(endTime - now);
|
||||
|
@ -85,39 +88,41 @@ int EnumerateCollectionBlock::initialize() {
|
|||
}
|
||||
|
||||
if (!inSync) {
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_CLUSTER_AQL_COLLECTION_OUT_OF_SYNC, "collection " + _collection->name);
|
||||
THROW_ARANGO_EXCEPTION_MESSAGE(
|
||||
TRI_ERROR_CLUSTER_AQL_COLLECTION_OUT_OF_SYNC,
|
||||
"collection " + _collection->name);
|
||||
}
|
||||
}
|
||||
|
||||
return ExecutionBlock::initialize();
|
||||
|
||||
// cppcheck-suppress style
|
||||
DEBUG_END_BLOCK();
|
||||
DEBUG_END_BLOCK();
|
||||
}
|
||||
|
||||
int EnumerateCollectionBlock::initializeCursor(AqlItemBlock* items,
|
||||
size_t pos) {
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
int res = ExecutionBlock::initializeCursor(items, pos);
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
return res;
|
||||
}
|
||||
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
_cursor->reset();
|
||||
DEBUG_END_BLOCK();
|
||||
DEBUG_END_BLOCK();
|
||||
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
|
||||
// cppcheck-suppress style
|
||||
DEBUG_END_BLOCK();
|
||||
DEBUG_END_BLOCK();
|
||||
}
|
||||
|
||||
/// @brief getSome
|
||||
AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
|
||||
size_t atMost) {
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
traceGetSomeBegin();
|
||||
|
||||
TRI_ASSERT(_cursor.get() != nullptr);
|
||||
|
@ -130,7 +135,7 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
|
|||
traceGetSomeEnd(nullptr);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
bool needMore;
|
||||
AqlItemBlock* cur = nullptr;
|
||||
size_t send = 0;
|
||||
|
@ -169,7 +174,7 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
|
|||
TRI_ASSERT(_cursor->hasMore());
|
||||
|
||||
size_t curRegs = cur->getNrRegs();
|
||||
|
||||
|
||||
RegisterId nrRegs =
|
||||
getPlanNode()->getRegisterPlan()->nrRegs[getPlanNode()->getDepth()];
|
||||
|
||||
|
@ -179,15 +184,16 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
|
|||
|
||||
// only copy 1st row of registers inherited from previous frame(s)
|
||||
inheritRegisters(cur, res.get(), _pos);
|
||||
|
||||
|
||||
auto col = _collection->getCollection();
|
||||
LogicalCollection* c = col.get();
|
||||
std::function<void(DocumentIdentifierToken const& tkn)> cb;
|
||||
if (_mustStoreResult) {
|
||||
cb = [&] (DocumentIdentifierToken const& tkn) {
|
||||
cb = [&](DocumentIdentifierToken const& tkn) {
|
||||
if (c->readDocument(_trx, tkn, *_mmdr)) {
|
||||
// The result is in the first variable of this depth,
|
||||
// we do not need to do a lookup in getPlanNode()->_registerPlan->varInfo,
|
||||
// we do not need to do a lookup in
|
||||
// getPlanNode()->_registerPlan->varInfo,
|
||||
// but can just take cur->getNrRegs() as registerId:
|
||||
uint8_t const* vpack = _mmdr->vpack();
|
||||
if (_mmdr->canUseInExternal()) {
|
||||
|
@ -207,7 +213,7 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
|
|||
++send;
|
||||
};
|
||||
} else {
|
||||
cb = [&] (DocumentIdentifierToken const& tkn) {
|
||||
cb = [&](DocumentIdentifierToken const& tkn) {
|
||||
if (send > 0) {
|
||||
// re-use already copied AQLValues
|
||||
res->copyValuesFromFirstRow(send, static_cast<RegisterId>(curRegs));
|
||||
|
@ -245,11 +251,11 @@ AqlItemBlock* EnumerateCollectionBlock::getSome(size_t, // atLeast,
|
|||
return res.release();
|
||||
|
||||
// cppcheck-suppress style
|
||||
DEBUG_END_BLOCK();
|
||||
DEBUG_END_BLOCK();
|
||||
}
|
||||
|
||||
size_t EnumerateCollectionBlock::skipSome(size_t atLeast, size_t atMost) {
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
DEBUG_BEGIN_BLOCK();
|
||||
size_t skipped = 0;
|
||||
TRI_ASSERT(_cursor != nullptr);
|
||||
|
||||
|
@ -299,5 +305,5 @@ size_t EnumerateCollectionBlock::skipSome(size_t atLeast, size_t atMost) {
|
|||
return skipped;
|
||||
|
||||
// cppcheck-suppress style
|
||||
DEBUG_END_BLOCK();
|
||||
DEBUG_END_BLOCK();
|
||||
}
|
||||
|
|
|
@ -27,12 +27,14 @@
|
|||
#include "Basics/StringUtils.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Rest/HttpRequest.h"
|
||||
#include "Transaction/Hints.h"
|
||||
#include "Transaction/StandaloneContext.h"
|
||||
#include "Utils/OperationOptions.h"
|
||||
#include "Utils/SingleCollectionTransaction.h"
|
||||
#include "Transaction/StandaloneContext.h"
|
||||
#include "Transaction/Hints.h"
|
||||
#include "VocBase/vocbase.h"
|
||||
|
||||
#include "Logger/Logger.h"
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::basics;
|
||||
using namespace arangodb::rest;
|
||||
|
@ -80,8 +82,7 @@ bool RestDocumentHandler::createDocument() {
|
|||
std::vector<std::string> const& suffixes = _request->decodedSuffixes();
|
||||
|
||||
if (suffixes.size() > 1) {
|
||||
generateError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_HTTP_SUPERFLUOUS_SUFFICES,
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_SUPERFLUOUS_SUFFICES,
|
||||
"superfluous suffix, expecting " + DOCUMENT_PATH +
|
||||
"?collection=<identifier>");
|
||||
return false;
|
||||
|
@ -105,8 +106,7 @@ bool RestDocumentHandler::createDocument() {
|
|||
}
|
||||
|
||||
bool parseSuccess = true;
|
||||
std::shared_ptr<VPackBuilder> parsedBody =
|
||||
parseVelocyPackBody(parseSuccess);
|
||||
std::shared_ptr<VPackBuilder> parsedBody = parseVelocyPackBody(parseSuccess);
|
||||
if (!parseSuccess) {
|
||||
return false;
|
||||
}
|
||||
|
@ -114,10 +114,14 @@ bool RestDocumentHandler::createDocument() {
|
|||
VPackSlice body = parsedBody->slice();
|
||||
|
||||
arangodb::OperationOptions opOptions;
|
||||
opOptions.isRestore = extractBooleanParameter(StaticStrings::IsRestoreString, false);
|
||||
opOptions.waitForSync = extractBooleanParameter(StaticStrings::WaitForSyncString, false);
|
||||
opOptions.returnNew = extractBooleanParameter(StaticStrings::ReturnNewString, false);
|
||||
opOptions.silent = extractBooleanParameter(StaticStrings::SilentString, false);
|
||||
opOptions.isRestore =
|
||||
extractBooleanParameter(StaticStrings::IsRestoreString, false);
|
||||
opOptions.waitForSync =
|
||||
extractBooleanParameter(StaticStrings::WaitForSyncString, false);
|
||||
opOptions.returnNew =
|
||||
extractBooleanParameter(StaticStrings::ReturnNewString, false);
|
||||
opOptions.silent =
|
||||
extractBooleanParameter(StaticStrings::SilentString, false);
|
||||
|
||||
// find and load collection given by name or identifier
|
||||
auto transactionContext(transaction::StandaloneContext::Create(_vocbase));
|
||||
|
@ -131,6 +135,8 @@ bool RestDocumentHandler::createDocument() {
|
|||
Result res = trx.begin();
|
||||
|
||||
if (!res.ok()) {
|
||||
LOG_TOPIC(ERR, Logger::FIXME) << "trx: " << res.errorMessage();
|
||||
|
||||
generateTransactionError(collectionName, res, "");
|
||||
return false;
|
||||
}
|
||||
|
@ -138,6 +144,10 @@ bool RestDocumentHandler::createDocument() {
|
|||
arangodb::OperationResult result =
|
||||
trx.insert(collectionName, body, opOptions);
|
||||
|
||||
if (result.code != TRI_ERROR_NO_ERROR) {
|
||||
LOG_TOPIC(ERR, Logger::FIXME) << "trx.insert: " << result.code;
|
||||
}
|
||||
|
||||
// Will commit if no error occured.
|
||||
// or abort if an error occured.
|
||||
// result stays valid!
|
||||
|
@ -149,6 +159,8 @@ bool RestDocumentHandler::createDocument() {
|
|||
}
|
||||
|
||||
if (!res.ok()) {
|
||||
LOG_TOPIC(ERR, Logger::FIXME) << "trx.finish: " << res.errorMessage();
|
||||
|
||||
generateTransactionError(collectionName, res, "");
|
||||
return false;
|
||||
}
|
||||
|
@ -200,19 +212,19 @@ bool RestDocumentHandler::readSingleDocument(bool generateBody) {
|
|||
|
||||
// check for an etag
|
||||
bool isValidRevision;
|
||||
TRI_voc_rid_t ifNoneRid =
|
||||
extractRevision("if-none-match", isValidRevision);
|
||||
TRI_voc_rid_t ifNoneRid = extractRevision("if-none-match", isValidRevision);
|
||||
if (!isValidRevision) {
|
||||
ifNoneRid = UINT64_MAX; // an impossible rev, so precondition failed will happen
|
||||
ifNoneRid =
|
||||
UINT64_MAX; // an impossible rev, so precondition failed will happen
|
||||
}
|
||||
|
||||
OperationOptions options;
|
||||
options.ignoreRevs = true;
|
||||
|
||||
TRI_voc_rid_t ifRid =
|
||||
extractRevision("if-match", isValidRevision);
|
||||
TRI_voc_rid_t ifRid = extractRevision("if-match", isValidRevision);
|
||||
if (!isValidRevision) {
|
||||
ifRid = UINT64_MAX; // an impossible rev, so precondition failed will happen
|
||||
ifRid =
|
||||
UINT64_MAX; // an impossible rev, so precondition failed will happen
|
||||
}
|
||||
|
||||
VPackBuilder builder;
|
||||
|
@ -286,8 +298,7 @@ bool RestDocumentHandler::checkDocument() {
|
|||
std::vector<std::string> const& suffixes = _request->decodedSuffixes();
|
||||
|
||||
if (suffixes.size() != 2) {
|
||||
generateError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"expecting URI /_api/document/<document-handle>");
|
||||
return false;
|
||||
}
|
||||
|
@ -325,11 +336,11 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) {
|
|||
std::string msg("expecting ");
|
||||
msg.append(isPatch ? "PATCH" : "PUT");
|
||||
msg.append(
|
||||
" /_api/document/<collectionname> or /_api/document/<document-handle> "
|
||||
" /_api/document/<collectionname> or "
|
||||
"/_api/document/<document-handle> "
|
||||
"or /_api/document and query parameter 'collection'");
|
||||
|
||||
generateError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_HTTP_BAD_PARAMETER, msg);
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, msg);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -350,8 +361,7 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) {
|
|||
std::string msg(
|
||||
"collection must be given in URL path or query parameter "
|
||||
"'collection' must be specified");
|
||||
generateError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_HTTP_BAD_PARAMETER, msg);
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, msg);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
|
@ -360,8 +370,7 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) {
|
|||
}
|
||||
|
||||
bool parseSuccess = true;
|
||||
std::shared_ptr<VPackBuilder> parsedBody =
|
||||
parseVelocyPackBody(parseSuccess);
|
||||
std::shared_ptr<VPackBuilder> parsedBody = parseVelocyPackBody(parseSuccess);
|
||||
if (!parseSuccess) {
|
||||
return false;
|
||||
}
|
||||
|
@ -375,12 +384,18 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) {
|
|||
}
|
||||
|
||||
OperationOptions opOptions;
|
||||
opOptions.isRestore = extractBooleanParameter(StaticStrings::IsRestoreString, false);
|
||||
opOptions.ignoreRevs = extractBooleanParameter(StaticStrings::IgnoreRevsString, true);
|
||||
opOptions.waitForSync = extractBooleanParameter(StaticStrings::WaitForSyncString, false);
|
||||
opOptions.returnNew = extractBooleanParameter(StaticStrings::ReturnNewString, false);
|
||||
opOptions.returnOld = extractBooleanParameter(StaticStrings::ReturnOldString, false);
|
||||
opOptions.silent = extractBooleanParameter(StaticStrings::SilentString, false);
|
||||
opOptions.isRestore =
|
||||
extractBooleanParameter(StaticStrings::IsRestoreString, false);
|
||||
opOptions.ignoreRevs =
|
||||
extractBooleanParameter(StaticStrings::IgnoreRevsString, true);
|
||||
opOptions.waitForSync =
|
||||
extractBooleanParameter(StaticStrings::WaitForSyncString, false);
|
||||
opOptions.returnNew =
|
||||
extractBooleanParameter(StaticStrings::ReturnNewString, false);
|
||||
opOptions.returnOld =
|
||||
extractBooleanParameter(StaticStrings::ReturnOldString, false);
|
||||
opOptions.silent =
|
||||
extractBooleanParameter(StaticStrings::SilentString, false);
|
||||
|
||||
// extract the revision, if single document variant and header given:
|
||||
std::shared_ptr<VPackBuilder> builder;
|
||||
|
@ -389,7 +404,7 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) {
|
|||
bool isValidRevision;
|
||||
revision = extractRevision("if-match", isValidRevision);
|
||||
if (!isValidRevision) {
|
||||
revision = UINT64_MAX; // an impossible revision, so precondition failed
|
||||
revision = UINT64_MAX; // an impossible revision, so precondition failed
|
||||
}
|
||||
VPackSlice keyInBody = body.get(StaticStrings::KeyString);
|
||||
if ((revision != 0 && TRI_ExtractRevisionId(body) != revision) ||
|
||||
|
@ -403,7 +418,7 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) {
|
|||
builder->add(StaticStrings::KeyString, VPackValue(key));
|
||||
if (revision != 0) {
|
||||
builder->add(StaticStrings::RevString,
|
||||
VPackValue(TRI_RidToString(revision)));
|
||||
VPackValue(TRI_RidToString(revision)));
|
||||
}
|
||||
}
|
||||
body = builder->slice();
|
||||
|
@ -435,8 +450,10 @@ bool RestDocumentHandler::modifyDocument(bool isPatch) {
|
|||
OperationResult result(TRI_ERROR_NO_ERROR);
|
||||
if (isPatch) {
|
||||
// patching an existing document
|
||||
opOptions.keepNull = extractBooleanParameter(StaticStrings::KeepNullString, true);
|
||||
opOptions.mergeObjects = extractBooleanParameter(StaticStrings::MergeObjectsString, true);
|
||||
opOptions.keepNull =
|
||||
extractBooleanParameter(StaticStrings::KeepNullString, true);
|
||||
opOptions.mergeObjects =
|
||||
extractBooleanParameter(StaticStrings::MergeObjectsString, true);
|
||||
result = trx.update(collectionName, body, opOptions);
|
||||
} else {
|
||||
result = trx.replace(collectionName, body, opOptions);
|
||||
|
@ -473,8 +490,7 @@ bool RestDocumentHandler::deleteDocument() {
|
|||
std::vector<std::string> const& suffixes = _request->decodedSuffixes();
|
||||
|
||||
if (suffixes.size() < 1 || suffixes.size() > 2) {
|
||||
generateError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"expecting DELETE /_api/document/<document-handle> or "
|
||||
"/_api/document/<collection> with a BODY");
|
||||
return false;
|
||||
|
@ -493,15 +509,19 @@ bool RestDocumentHandler::deleteDocument() {
|
|||
bool isValidRevision = false;
|
||||
revision = extractRevision("if-match", isValidRevision);
|
||||
if (!isValidRevision) {
|
||||
revision = UINT64_MAX; // an impossible revision, so precondition failed
|
||||
revision = UINT64_MAX; // an impossible revision, so precondition failed
|
||||
}
|
||||
}
|
||||
|
||||
OperationOptions opOptions;
|
||||
opOptions.returnOld = extractBooleanParameter(StaticStrings::ReturnOldString, false);
|
||||
opOptions.ignoreRevs = extractBooleanParameter(StaticStrings::IgnoreRevsString, true);
|
||||
opOptions.waitForSync = extractBooleanParameter(StaticStrings::WaitForSyncString, false);
|
||||
opOptions.silent = extractBooleanParameter(StaticStrings::SilentString, false);
|
||||
opOptions.returnOld =
|
||||
extractBooleanParameter(StaticStrings::ReturnOldString, false);
|
||||
opOptions.ignoreRevs =
|
||||
extractBooleanParameter(StaticStrings::IgnoreRevsString, true);
|
||||
opOptions.waitForSync =
|
||||
extractBooleanParameter(StaticStrings::WaitForSyncString, false);
|
||||
opOptions.silent =
|
||||
extractBooleanParameter(StaticStrings::SilentString, false);
|
||||
|
||||
auto transactionContext(transaction::StandaloneContext::Create(_vocbase));
|
||||
|
||||
|
@ -527,16 +547,16 @@ bool RestDocumentHandler::deleteDocument() {
|
|||
} catch (...) {
|
||||
// If an error occurs here the body is not parsable. Fail with bad
|
||||
// parameter
|
||||
generateError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_HTTP_BAD_PARAMETER, "Request body not parseable");
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"Request body not parseable");
|
||||
return false;
|
||||
}
|
||||
search = builderPtr->slice();
|
||||
}
|
||||
|
||||
if (!search.isArray() && !search.isObject()) {
|
||||
generateError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_HTTP_BAD_PARAMETER, "Request body not parseable");
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"Request body not parseable");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -581,8 +601,7 @@ bool RestDocumentHandler::readManyDocuments() {
|
|||
std::vector<std::string> const& suffixes = _request->decodedSuffixes();
|
||||
|
||||
if (suffixes.size() != 1) {
|
||||
generateError(rest::ResponseCode::BAD,
|
||||
TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"expecting PUT /_api/document/<collection> with a BODY");
|
||||
return false;
|
||||
}
|
||||
|
@ -591,7 +610,8 @@ bool RestDocumentHandler::readManyDocuments() {
|
|||
std::string const& collectionName = suffixes[0];
|
||||
|
||||
OperationOptions opOptions;
|
||||
opOptions.ignoreRevs = extractBooleanParameter(StaticStrings::IgnoreRevsString, true);
|
||||
opOptions.ignoreRevs =
|
||||
extractBooleanParameter(StaticStrings::IgnoreRevsString, true);
|
||||
|
||||
auto transactionContext(transaction::StandaloneContext::Create(_vocbase));
|
||||
SingleCollectionTransaction trx(transactionContext, collectionName,
|
||||
|
|
|
@ -521,14 +521,14 @@ int RocksDBCollection::read(transaction::Methods* trx,
|
|||
// found
|
||||
return TRI_ERROR_NO_ERROR;
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::FIXME)
|
||||
/*LOG_TOPIC(ERR, Logger::FIXME)
|
||||
<< "#" << trx->state()->id() << " failed to read revision "
|
||||
<< token.revisionId() << " for key " << key.copyString();
|
||||
<< token.revisionId() << " for key " << key.copyString();*/
|
||||
}
|
||||
} else {
|
||||
LOG_TOPIC(ERR, Logger::DEVEL) << "#" << trx->state()->id()
|
||||
/*LOG_TOPIC(ERR, Logger::DEVEL) << "#" << trx->state()->id()
|
||||
<< " failed to find token for "
|
||||
<< key.copyString() << " in read";
|
||||
<< key.copyString() << " in read";*/
|
||||
}
|
||||
|
||||
// not found
|
||||
|
@ -1274,6 +1274,10 @@ arangodb::Result RocksDBCollection::lookupRevisionVPack(
|
|||
if (result.ok()) {
|
||||
mdr.setManaged(std::move(value), revisionId);
|
||||
} else {
|
||||
/*LOG_TOPIC(ERR, Logger::ENGINES)
|
||||
<< "#" << trx->state()->id() << " LOOKUP REVISION FAILED. COLLECTION '"
|
||||
<< _logicalCollection->name() << "', OBJECTID: " << _objectId
|
||||
<< ", REVISIONID: " << revisionId << "; " << result.errorNumber();*/
|
||||
mdr.reset();
|
||||
}
|
||||
return result;
|
||||
|
|
|
@ -99,8 +99,11 @@ void RocksDBEngine::collectOptions(
|
|||
new UInt64Parameter(&_maxTransactionSize));
|
||||
|
||||
// control intermediate transactions in RocksDB
|
||||
_intermediateTransactionSize = (_maxTransactionSize / 5) * 4; // transaction size that will trigger an intermediate commit
|
||||
_intermediateTransactionNumber = 100 * 1000; // number operation after that a commit will be tried
|
||||
_intermediateTransactionSize =
|
||||
(_maxTransactionSize / 5) *
|
||||
4; // transaction size that will trigger an intermediate commit
|
||||
_intermediateTransactionNumber =
|
||||
100 * 1000; // number operation after that a commit will be tried
|
||||
_intermediateTransactionEnabled = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ RocksDBIndex::RocksDBIndex(
|
|||
std::vector<std::vector<arangodb::basics::AttributeName>> const& attributes,
|
||||
bool unique, bool sparse, uint64_t objectId)
|
||||
: Index(id, collection, attributes, unique, sparse),
|
||||
_objectId(objectId ? objectId : TRI_NewTickServer()),
|
||||
_objectId((objectId != 0) ? objectId : TRI_NewTickServer()),
|
||||
_cmp(static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->cmp()),
|
||||
_cacheManager(CacheManagerFeature::MANAGER),
|
||||
_cache(nullptr),
|
||||
|
@ -52,7 +52,11 @@ RocksDBIndex::RocksDBIndex(TRI_idx_iid_t id, LogicalCollection* collection,
|
|||
_cmp(static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->cmp()),
|
||||
_cacheManager(CacheManagerFeature::MANAGER),
|
||||
_cache(nullptr),
|
||||
_useCache(false) {}
|
||||
_useCache(false) {
|
||||
if (_objectId == 0) {
|
||||
_objectId = TRI_NewTickServer();
|
||||
}
|
||||
}
|
||||
|
||||
RocksDBIndex::~RocksDBIndex() {
|
||||
if (_useCache && _cache != nullptr) {
|
||||
|
@ -65,8 +69,7 @@ RocksDBIndex::~RocksDBIndex() {
|
|||
}
|
||||
|
||||
/// @brief return a VelocyPack representation of the index
|
||||
void RocksDBIndex::toVelocyPack(VPackBuilder& builder,
|
||||
bool withFigures) const {
|
||||
void RocksDBIndex::toVelocyPack(VPackBuilder& builder, bool withFigures) const {
|
||||
Index::toVelocyPack(builder, withFigures);
|
||||
builder.add("objectId", VPackValue(std::to_string(_objectId)));
|
||||
}
|
||||
|
|
|
@ -28,9 +28,9 @@
|
|||
#include "Indexes/Index.h"
|
||||
#include "RocksDBEngine/RocksDBEdgeIndex.h"
|
||||
#include "RocksDBEngine/RocksDBEngine.h"
|
||||
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
|
||||
#include "RocksDBEngine/RocksDBPersistentIndex.h"
|
||||
#include "RocksDBEngine/RocksDBHashIndex.h"
|
||||
#include "RocksDBEngine/RocksDBPersistentIndex.h"
|
||||
#include "RocksDBEngine/RocksDBPrimaryIndex.h"
|
||||
#include "RocksDBEngine/RocksDBSkiplistIndex.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
#include "VocBase/ticks.h"
|
||||
|
@ -152,7 +152,7 @@ static int EnhanceJsonIndexSkiplist(VPackSlice const definition,
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
static int EnhanceJsonIndexPersistent(VPackSlice const definition,
|
||||
VPackBuilder& builder, bool create) {
|
||||
VPackBuilder& builder, bool create) {
|
||||
int res = ProcessIndexFields(definition, builder, 0, create);
|
||||
if (res == TRI_ERROR_NO_ERROR) {
|
||||
ProcessIndexSparseFlag(definition, builder, create);
|
||||
|
@ -294,7 +294,7 @@ int RocksDBIndexFactory::enhanceIndexDefinition(VPackSlice const definition,
|
|||
case Index::TRI_IDX_TYPE_SKIPLIST_INDEX:
|
||||
res = EnhanceJsonIndexSkiplist(definition, enhanced, create);
|
||||
break;
|
||||
|
||||
|
||||
case Index::TRI_IDX_TYPE_PERSISTENT_INDEX:
|
||||
res = EnhanceJsonIndexPersistent(definition, enhanced, create);
|
||||
break;
|
||||
|
@ -377,14 +377,13 @@ std::shared_ptr<Index> RocksDBIndexFactory::prepareIndexFromSlice(
|
|||
VPackSlice fields = info.get("fields");
|
||||
TRI_ASSERT(fields.isArray() && fields.length() == 1);
|
||||
std::string direction = fields.at(0).copyString();
|
||||
TRI_ASSERT(direction == StaticStrings::FromString
|
||||
|| direction == StaticStrings::ToString);
|
||||
newIdx.reset(
|
||||
new arangodb::RocksDBEdgeIndex(iid, col, info, direction));
|
||||
TRI_ASSERT(direction == StaticStrings::FromString ||
|
||||
direction == StaticStrings::ToString);
|
||||
newIdx.reset(new arangodb::RocksDBEdgeIndex(iid, col, info, direction));
|
||||
break;
|
||||
}
|
||||
//case arangodb::Index::TRI_IDX_TYPE_GEO1_INDEX:
|
||||
//case arangodb::Index::TRI_IDX_TYPE_GEO2_INDEX:
|
||||
// case arangodb::Index::TRI_IDX_TYPE_GEO1_INDEX:
|
||||
// case arangodb::Index::TRI_IDX_TYPE_GEO2_INDEX:
|
||||
case arangodb::Index::TRI_IDX_TYPE_HASH_INDEX: {
|
||||
newIdx.reset(new arangodb::RocksDBHashIndex(iid, col, info));
|
||||
break;
|
||||
|
@ -403,7 +402,7 @@ std::shared_ptr<Index> RocksDBIndexFactory::prepareIndexFromSlice(
|
|||
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL, "invalid index type");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
TRI_ASSERT(newIdx != nullptr);
|
||||
return newIdx;
|
||||
}
|
||||
|
@ -415,7 +414,7 @@ void RocksDBIndexFactory::fillSystemIndexes(
|
|||
VPackBuilder builder;
|
||||
builder.openObject();
|
||||
builder.close();
|
||||
|
||||
|
||||
systemIndexes.emplace_back(
|
||||
std::make_shared<arangodb::RocksDBPrimaryIndex>(col, builder.slice()));
|
||||
// create edges indexes
|
||||
|
@ -428,5 +427,6 @@ void RocksDBIndexFactory::fillSystemIndexes(
|
|||
}
|
||||
|
||||
std::vector<std::string> RocksDBIndexFactory::supportedIndexes() const {
|
||||
return std::vector<std::string>{ "primary", "edge", "hash", "skiplist", "persistent" };
|
||||
return std::vector<std::string>{"primary", "edge", "hash", "skiplist",
|
||||
"persistent"};
|
||||
}
|
||||
|
|
|
@ -24,9 +24,9 @@
|
|||
|
||||
#include "RocksDBEngine/RocksDBKey.h"
|
||||
#include "Basics/Exceptions.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "RocksDBEngine/RocksDBTypes.h"
|
||||
#include "Logger/Logger.h"
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::rocksutils;
|
||||
|
@ -47,14 +47,15 @@ RocksDBKey RocksDBKey::Document(uint64_t collectionId,
|
|||
return RocksDBKey(RocksDBEntryType::Document, collectionId, revisionId);
|
||||
}
|
||||
|
||||
RocksDBKey RocksDBKey::PrimaryIndexValue(uint64_t indexId,
|
||||
arangodb::StringRef const& primaryKey) {
|
||||
RocksDBKey RocksDBKey::PrimaryIndexValue(
|
||||
uint64_t indexId, arangodb::StringRef const& primaryKey) {
|
||||
return RocksDBKey(RocksDBEntryType::PrimaryIndexValue, indexId, primaryKey);
|
||||
}
|
||||
|
||||
RocksDBKey RocksDBKey::PrimaryIndexValue(uint64_t indexId,
|
||||
char const* primaryKey) {
|
||||
return RocksDBKey(RocksDBEntryType::PrimaryIndexValue, indexId, StringRef(primaryKey));
|
||||
return RocksDBKey(RocksDBEntryType::PrimaryIndexValue, indexId,
|
||||
StringRef(primaryKey));
|
||||
}
|
||||
|
||||
RocksDBKey RocksDBKey::EdgeIndexValue(uint64_t indexId,
|
||||
|
@ -84,7 +85,6 @@ RocksDBKey RocksDBKey::CounterValue(uint64_t objectId) {
|
|||
return RocksDBKey(RocksDBEntryType::CounterValue, objectId);
|
||||
}
|
||||
|
||||
|
||||
RocksDBEntryType RocksDBKey::type(RocksDBKey const& key) {
|
||||
return type(key._buffer.data(), key._buffer.size());
|
||||
}
|
||||
|
@ -248,7 +248,8 @@ RocksDBKey::RocksDBKey(RocksDBEntryType type, uint64_t first,
|
|||
}
|
||||
}
|
||||
|
||||
RocksDBKey::RocksDBKey(RocksDBEntryType type, uint64_t first, arangodb::StringRef const& second)
|
||||
RocksDBKey::RocksDBKey(RocksDBEntryType type, uint64_t first,
|
||||
arangodb::StringRef const& second)
|
||||
: _type(type), _buffer() {
|
||||
switch (_type) {
|
||||
case RocksDBEntryType::PrimaryIndexValue: {
|
||||
|
@ -362,7 +363,7 @@ arangodb::StringRef RocksDBKey::primaryKey(char const* data, size_t size) {
|
|||
RocksDBEntryType type = static_cast<RocksDBEntryType>(data[0]);
|
||||
switch (type) {
|
||||
case RocksDBEntryType::PrimaryIndexValue: {
|
||||
TRI_ASSERT(size > (sizeof(char) + sizeof(uint64_t) + sizeof(uint8_t)));
|
||||
TRI_ASSERT(size >= (sizeof(char) + sizeof(uint64_t) + sizeof(char)));
|
||||
size_t keySize = size - (sizeof(char) + sizeof(uint64_t));
|
||||
return arangodb::StringRef(data + sizeof(char) + sizeof(uint64_t),
|
||||
keySize);
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "Cache/CachedValue.h"
|
||||
#include "Cache/TransactionalCache.h"
|
||||
#include "Indexes/SimpleAttributeEqualityMatcher.h"
|
||||
#include "Logger/Logger.h"
|
||||
#include "RocksDBEngine/RocksDBCollection.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "RocksDBEngine/RocksDBComparator.h"
|
||||
|
@ -114,6 +115,7 @@ RocksDBAllIndexIterator::RocksDBAllIndexIterator(
|
|||
LogicalCollection* collection, transaction::Methods* trx,
|
||||
ManagedDocumentResult* mmdr, RocksDBPrimaryIndex const* index, bool reverse)
|
||||
: IndexIterator(collection, trx, mmdr, index),
|
||||
_index(index),
|
||||
_cmp(index->_cmp),
|
||||
_reverse(reverse),
|
||||
_bounds(RocksDBKeyBounds::PrimaryIndex(index->objectId())) {
|
||||
|
@ -141,6 +143,13 @@ bool RocksDBAllIndexIterator::next(TokenCallback const& cb, size_t limit) {
|
|||
|
||||
while (limit > 0) {
|
||||
RocksDBToken token(RocksDBValue::revisionId(_iterator->value()));
|
||||
/*LOG_TOPIC(ERR, Logger::FIXME)
|
||||
<< "AllIndexIterator '" << _collection->name() << "' ("
|
||||
<< static_cast<RocksDBCollection*>(_collection->getPhysical())
|
||||
->objectId()
|
||||
<< ", " << _index->objectId()
|
||||
<< "): " << RocksDBKey::primaryKey(_iterator->key()) << " (" << limit
|
||||
<< "); " << token.revisionId();*/
|
||||
cb(token);
|
||||
|
||||
--limit;
|
||||
|
@ -192,7 +201,7 @@ RocksDBAnyIndexIterator::RocksDBAnyIndexIterator(
|
|||
_iterator.reset(rtrx->GetIterator(options));
|
||||
_total = collection->numberDocuments(trx);
|
||||
uint64_t off = RandomGenerator::interval(_total - 1);
|
||||
//uint64_t goal = off;
|
||||
// uint64_t goal = off;
|
||||
if (_total > 0) {
|
||||
if (off <= _total / 2) {
|
||||
_iterator->Seek(_bounds.start());
|
||||
|
@ -255,8 +264,15 @@ RocksDBPrimaryIndex::RocksDBPrimaryIndex(
|
|||
StaticStrings::KeyString, false)}}),
|
||||
true, false,
|
||||
basics::VelocyPackHelper::stringUInt64(info, "objectId")) {
|
||||
_useCache = true;
|
||||
createCache();
|
||||
if (_objectId != 0) {
|
||||
_useCache = true;
|
||||
createCache();
|
||||
/*LOG_TOPIC(ERR, Logger::FIXME)
|
||||
<< "primary index objectId: " << _objectId << " ("
|
||||
<< static_cast<RocksDBCollection*>(collection->getPhysical())
|
||||
->objectId()
|
||||
<< ", " << collection->name() << ")";*/
|
||||
}
|
||||
}
|
||||
|
||||
RocksDBPrimaryIndex::~RocksDBPrimaryIndex() {}
|
||||
|
@ -389,11 +405,12 @@ int RocksDBPrimaryIndex::insert(transaction::Methods* trx,
|
|||
}
|
||||
}
|
||||
|
||||
/*LOG_TOPIC(ERR, Logger::FIXME)
|
||||
<< "#" << trx->state()->id() << "'" << _collection->name() << "' ("
|
||||
<< static_cast<RocksDBCollection*>(_collection->getPhysical())->objectId()
|
||||
<< ", " << _objectId << ") primary insert " << revisionId;*/
|
||||
auto status = rtrx->Put(key.string(), value.string());
|
||||
if (!status.ok()) {
|
||||
/*LOG_TOPIC(ERR, Logger::FIXME)
|
||||
<< "#" << trx->state()->id() << " failed to insert "
|
||||
<< StringRef(slice.get(StaticStrings::KeyString)).toString();*/
|
||||
auto converted =
|
||||
rocksutils::convertStatus(status, rocksutils::StatusHint::index);
|
||||
return converted.errorNumber();
|
||||
|
|
|
@ -89,6 +89,7 @@ class RocksDBAllIndexIterator final : public IndexIterator {
|
|||
private:
|
||||
bool outOfRange() const;
|
||||
|
||||
RocksDBPrimaryIndex const* _index;
|
||||
RocksDBComparator const* _cmp;
|
||||
bool const _reverse;
|
||||
std::unique_ptr<rocksdb::Iterator> _iterator;
|
||||
|
|
|
@ -512,6 +512,7 @@ int RocksDBVPackIndex::insert(transaction::Methods* trx,
|
|||
if (res == TRI_ERROR_ARANGO_UNIQUE_CONSTRAINT_VIOLATED && !_unique) {
|
||||
// We ignore unique_constraint violated if we are not unique
|
||||
res = TRI_ERROR_NO_ERROR;
|
||||
// TODO: remove this? seems dangerous...
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue