mirror of https://gitee.com/bigwinds/arangodb
Added REST Wal handler
This commit is contained in:
parent
de388b10a0
commit
01ea3556b8
|
@ -14,6 +14,7 @@ set(ROCKSDB_SOURCES
|
|||
RocksDBEngine/RocksDBKey.cpp
|
||||
RocksDBEngine/RocksDBKeyBounds.cpp
|
||||
RocksDBEngine/RocksDBPrimaryIndex.cpp
|
||||
RocksDBEngine/RocksDBRestWalHandler.cpp
|
||||
RocksDBEngine/RocksDBTransactionCollection.cpp
|
||||
RocksDBEngine/RocksDBTransactionState.cpp
|
||||
RocksDBEngine/RocksDBTypes.cpp
|
||||
|
|
|
@ -848,12 +848,11 @@ void RocksDBCollection::addIndexCoordinator(
|
|||
int RocksDBCollection::saveIndex(transaction::Methods* trx,
|
||||
std::shared_ptr<arangodb::Index> idx) {
|
||||
TRI_ASSERT(!ServerState::instance()->isCoordinator());
|
||||
// we cannot persist PrimaryMockIndex
|
||||
// we cannot persist primary or edge indexes
|
||||
TRI_ASSERT(idx->type() != Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX);
|
||||
std::vector<std::shared_ptr<arangodb::Index>> indexListLocal;
|
||||
indexListLocal.emplace_back(idx);
|
||||
TRI_ASSERT(idx->type() != Index::IndexType::TRI_IDX_TYPE_EDGE_INDEX);
|
||||
|
||||
Result res = fillIndexes(trx, indexListLocal);
|
||||
Result res = fillIndexes(trx, idx);
|
||||
if (!res.ok()) {
|
||||
return res.errorNumber();
|
||||
}
|
||||
|
@ -870,28 +869,25 @@ int RocksDBCollection::saveIndex(transaction::Methods* trx,
|
|||
}
|
||||
|
||||
arangodb::Result RocksDBCollection::fillIndexes(transaction::Methods* trx,
|
||||
std::vector<std::shared_ptr<arangodb::Index>> added) {
|
||||
std::shared_ptr<arangodb::Index> added) {
|
||||
|
||||
ManagedDocumentResult mmr;
|
||||
std::unique_ptr<IndexIterator> iter(primaryIndex()->allIterator(trx, &mmr, false));
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
|
||||
std::vector<DocumentIdentifierToken> tokens;
|
||||
auto cb = [&](DocumentIdentifierToken token) {
|
||||
|
||||
bool ret = this->readDocument(trx, token, mmr);
|
||||
if (ret) {
|
||||
for (std::shared_ptr<arangodb::Index> index : added) {
|
||||
RocksDBIndex *ridx = static_cast<RocksDBIndex*>(index.get());
|
||||
ridx->insert(trx, mmr.lastRevisionId(), VPackSlice(mmr.vpack()), false);
|
||||
}
|
||||
if (res == TRI_ERROR_NO_ERROR && this->readDocument(trx, token, mmr)) {
|
||||
RocksDBIndex *ridx = static_cast<RocksDBIndex*>(added.get());
|
||||
res = ridx->insert(trx, mmr.lastRevisionId(), VPackSlice(mmr.vpack()), false);
|
||||
}
|
||||
};
|
||||
while (iter->next(cb, 10000)) {
|
||||
while (iter->next(cb, 1000) && res == TRI_ERROR_NO_ERROR) {
|
||||
if (_logicalCollection->deleted()) {
|
||||
return Result(TRI_ERROR_INTERNAL);
|
||||
}
|
||||
}
|
||||
return Result();
|
||||
return Result(res);
|
||||
}
|
||||
|
||||
// @brief return the primary index
|
||||
|
|
|
@ -189,7 +189,7 @@ class RocksDBCollection final : public PhysicalCollection {
|
|||
std::shared_ptr<arangodb::Index> idx);
|
||||
|
||||
arangodb::Result fillIndexes(transaction::Methods*,
|
||||
std::vector<std::shared_ptr<arangodb::Index>>);
|
||||
std::shared_ptr<arangodb::Index>);
|
||||
|
||||
arangodb::RocksDBPrimaryIndex* primaryIndex() const;
|
||||
|
||||
|
|
|
@ -31,6 +31,8 @@
|
|||
#include "Logger/Logger.h"
|
||||
#include "ProgramOptions/ProgramOptions.h"
|
||||
#include "ProgramOptions/Section.h"
|
||||
#include "GeneralServer/RestHandlerFactory.h"
|
||||
#include "RestHandler/RestHandlerCreator.h"
|
||||
#include "RestServer/DatabasePathFeature.h"
|
||||
#include "RestServer/ViewTypesFeature.h"
|
||||
#include "RocksDBEngine/RocksDBCollection.h"
|
||||
|
@ -40,6 +42,7 @@
|
|||
#include "RocksDBEngine/RocksDBIndex.h"
|
||||
#include "RocksDBEngine/RocksDBIndexFactory.h"
|
||||
#include "RocksDBEngine/RocksDBKey.h"
|
||||
#include "RocksDBEngine/RocksDBRestWalHandler.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionCollection.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionContextData.h"
|
||||
#include "RocksDBEngine/RocksDBTransactionState.h"
|
||||
|
@ -720,8 +723,13 @@ void RocksDBEngine::addV8Functions() {
|
|||
}
|
||||
|
||||
/// @brief Add engine-specific REST handlers
|
||||
void RocksDBEngine::addRestHandlers(rest::RestHandlerFactory*) {
|
||||
void RocksDBEngine::addRestHandlers(rest::RestHandlerFactory* handlerFactory) {
|
||||
// TODO: add /_api/export and /_admin/wal later
|
||||
handlerFactory->addPrefixHandler("/_admin/wal",
|
||||
RestHandlerCreator<RocksDBRestWalHandler>::createNoData);
|
||||
|
||||
//handlerFactory->addPrefixHandler(
|
||||
// "/_api/export", RestHandlerCreator<MMFilesRestExportHandler>::createNoData);
|
||||
}
|
||||
|
||||
Result RocksDBEngine::dropDatabase(TRI_voc_tick_t id) {
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Jan Steemann
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#include "RocksDBRestWalHandler.h"
|
||||
#include "Basics/VelocyPackHelper.h"
|
||||
#include "Cluster/ClusterMethods.h"
|
||||
#include "Cluster/ServerState.h"
|
||||
#include "RocksDBEngine/RocksDBCommon.h"
|
||||
#include "RocksDBEngine/RocksDBEngine.h"
|
||||
#include "StorageEngine/EngineSelectorFeature.h"
|
||||
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
|
||||
using namespace arangodb;
|
||||
using namespace arangodb::rest;
|
||||
|
||||
RocksDBRestWalHandler::RocksDBRestWalHandler(GeneralRequest* request,
|
||||
GeneralResponse* response)
|
||||
: RestVocbaseBaseHandler(request, response) {}
|
||||
|
||||
RestStatus RocksDBRestWalHandler::execute() {
|
||||
std::vector<std::string> const& suffixes = _request->suffixes();
|
||||
|
||||
if (suffixes.size() != 1) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"expecting /_admin/wal/<operation>");
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
|
||||
std::string const& operation = suffixes[0];
|
||||
|
||||
// extract the sub-request type
|
||||
auto const type = _request->requestType();
|
||||
|
||||
if (operation == "transactions") {
|
||||
if (type == rest::RequestType::GET) {
|
||||
transactions();
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
} else if (operation == "flush") {
|
||||
if (type == rest::RequestType::PUT) {
|
||||
flush();
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
} else if (operation == "properties") {
|
||||
if (type == rest::RequestType::GET || type == rest::RequestType::PUT) {
|
||||
properties();
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
} else {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"expecting /_admin/wal/<operation>");
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
|
||||
generateError(rest::ResponseCode::METHOD_NOT_ALLOWED,
|
||||
TRI_ERROR_HTTP_METHOD_NOT_ALLOWED);
|
||||
return RestStatus::DONE;
|
||||
}
|
||||
|
||||
void RocksDBRestWalHandler::properties() {
|
||||
/*auto l = MMFilesLogfileManager::instance();
|
||||
|
||||
if (_request->requestType() == rest::RequestType::PUT) {
|
||||
std::shared_ptr<VPackBuilder> parsedRequest;
|
||||
VPackSlice slice;
|
||||
try {
|
||||
slice = _request->payload();
|
||||
} catch (...) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"invalid body value. expecting object");
|
||||
return;
|
||||
}
|
||||
if (!slice.isObject()) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"invalid body value. expecting object");
|
||||
}
|
||||
|
||||
if (slice.hasKey("allowOversizeEntries")) {
|
||||
bool value = slice.get("allowOversizeEntries").getBoolean();
|
||||
l->allowOversizeEntries(value);
|
||||
}
|
||||
|
||||
if (slice.hasKey("logfileSize")) {
|
||||
uint32_t value = slice.get("logfileSize").getNumericValue<uint32_t>();
|
||||
l->filesize(value);
|
||||
}
|
||||
|
||||
if (slice.hasKey("historicLogfiles")) {
|
||||
uint32_t value =
|
||||
slice.get("historicLogfiles").getNumericValue<uint32_t>();
|
||||
l->historicLogfiles(value);
|
||||
}
|
||||
|
||||
if (slice.hasKey("reserveLogfiles")) {
|
||||
uint32_t value = slice.get("reserveLogfiles").getNumericValue<uint32_t>();
|
||||
l->reserveLogfiles(value);
|
||||
}
|
||||
|
||||
if (slice.hasKey("throttleWait")) {
|
||||
uint64_t value = slice.get("throttleWait").getNumericValue<uint64_t>();
|
||||
l->maxThrottleWait(value);
|
||||
}
|
||||
|
||||
if (slice.hasKey("throttleWhenPending")) {
|
||||
uint64_t value =
|
||||
slice.get("throttleWhenPending").getNumericValue<uint64_t>();
|
||||
l->throttleWhenPending(value);
|
||||
}
|
||||
}
|
||||
|
||||
VPackBuilder builder;
|
||||
builder.openObject();
|
||||
builder.add("allowOversizeEntries", VPackValue(l->allowOversizeEntries()));
|
||||
builder.add("logfileSize", VPackValue(l->filesize()));
|
||||
builder.add("historicLogfiles", VPackValue(l->historicLogfiles()));
|
||||
builder.add("reserveLogfiles", VPackValue(l->reserveLogfiles()));
|
||||
builder.add("syncInterval", VPackValue(l->syncInterval()));
|
||||
builder.add("throttleWait", VPackValue(l->maxThrottleWait()));
|
||||
builder.add("throttleWhenPending", VPackValue(l->throttleWhenPending()));
|
||||
|
||||
builder.close();*/
|
||||
generateResult(rest::ResponseCode::NOT_IMPLEMENTED,
|
||||
basics::VelocyPackHelper::EmptyObjectValue());
|
||||
}
|
||||
|
||||
void RocksDBRestWalHandler::flush() {
|
||||
std::shared_ptr<VPackBuilder> parsedRequest;
|
||||
VPackSlice slice;
|
||||
try {
|
||||
slice = _request->payload();
|
||||
} catch (...) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"invalid body value. expecting object");
|
||||
return;
|
||||
}
|
||||
if (!slice.isObject() && !slice.isNone()) {
|
||||
generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER,
|
||||
"invalid body value. expecting object");
|
||||
}
|
||||
|
||||
bool waitForSync = false;
|
||||
bool waitForCollector = false;
|
||||
|
||||
if (slice.isObject()) {
|
||||
// got a request body
|
||||
VPackSlice value = slice.get("waitForSync");
|
||||
if (value.isString()) {
|
||||
waitForSync = (value.copyString() == "true");
|
||||
} else if (value.isBoolean()) {
|
||||
waitForSync = value.getBoolean();
|
||||
}
|
||||
|
||||
value = slice.get("waitForCollector");
|
||||
if (value.isString()) {
|
||||
waitForCollector = (value.copyString() == "true");
|
||||
} else if (value.isBoolean()) {
|
||||
waitForCollector = value.getBoolean();
|
||||
}
|
||||
} else {
|
||||
// no request body
|
||||
bool found;
|
||||
{
|
||||
std::string const& v = _request->value("waitForSync", found);
|
||||
if (found) {
|
||||
waitForSync = (v == "1" || v == "true");
|
||||
}
|
||||
}
|
||||
{
|
||||
std::string const& v = _request->value("waitForCollector", found);
|
||||
if (found) {
|
||||
waitForCollector = (v == "1" || v == "true");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int res = TRI_ERROR_NO_ERROR;
|
||||
if (ServerState::instance()->isCoordinator()) {
|
||||
res = flushWalOnAllDBServers(waitForSync, waitForCollector);
|
||||
} else {
|
||||
rocksdb::TransactionDB* db =
|
||||
static_cast<RocksDBEngine*>(EngineSelectorFeature::ENGINE)->db();
|
||||
|
||||
rocksdb::Status status = db->GetBaseDB()->SyncWAL();
|
||||
if (!status.ok()) {
|
||||
res = rocksutils::convertStatus(status).errorNumber();
|
||||
}
|
||||
// res = MMFilesLogfileManager::instance()->flush(
|
||||
// waitForSync, waitForCollector, false);
|
||||
}
|
||||
|
||||
if (res != TRI_ERROR_NO_ERROR) {
|
||||
THROW_ARANGO_EXCEPTION(res);
|
||||
}
|
||||
generateResult(rest::ResponseCode::OK,
|
||||
basics::VelocyPackHelper::EmptyObjectValue());
|
||||
}
|
||||
|
||||
void RocksDBRestWalHandler::transactions() {
|
||||
VPackBuilder builder;
|
||||
|
||||
/*auto const& info =
|
||||
MMFilesLogfileManager::instance()->runningTransactions();
|
||||
|
||||
builder.openObject();
|
||||
builder.add("runningTransactions",
|
||||
VPackValue(static_cast<double>(std::get<0>(info))));
|
||||
|
||||
// lastCollectedId
|
||||
{
|
||||
auto value = std::get<1>(info);
|
||||
if (value == UINT64_MAX) {
|
||||
builder.add("minLastCollected", VPackValue(VPackValueType::Null));
|
||||
} else {
|
||||
builder.add("minLastCollected", VPackValue(value));
|
||||
}
|
||||
}
|
||||
|
||||
// lastSealedId
|
||||
{
|
||||
auto value = std::get<2>(info);
|
||||
if (value == UINT64_MAX) {
|
||||
builder.add("minLastSealed", VPackValue(VPackValueType::Null));
|
||||
} else {
|
||||
builder.add("minLastSealed", VPackValue(value));
|
||||
}
|
||||
}
|
||||
|
||||
builder.close();*/
|
||||
|
||||
generateResult(rest::ResponseCode::NOT_IMPLEMENTED, builder.slice());
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
/// DISCLAIMER
|
||||
///
|
||||
/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany
|
||||
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
|
||||
///
|
||||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
/// you may not use this file except in compliance with the License.
|
||||
/// You may obtain a copy of the License at
|
||||
///
|
||||
/// http://www.apache.org/licenses/LICENSE-2.0
|
||||
///
|
||||
/// Unless required by applicable law or agreed to in writing, software
|
||||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
/// See the License for the specific language governing permissions and
|
||||
/// limitations under the License.
|
||||
///
|
||||
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
///
|
||||
/// @author Simon Grätzer
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#ifndef ARANGOD_ROCKSDB_REST_WAL_HANDLER_H
|
||||
#define ARANGOD_MMFILES_MMFILES_REST_WAL_HANDLER_H 1
|
||||
|
||||
#include "Basics/Common.h"
|
||||
#include "RestHandler/RestVocbaseBaseHandler.h"
|
||||
|
||||
namespace arangodb {
|
||||
|
||||
class RocksDBRestWalHandler : public RestVocbaseBaseHandler {
|
||||
public:
|
||||
RocksDBRestWalHandler(GeneralRequest*, GeneralResponse*);
|
||||
|
||||
public:
|
||||
RestStatus execute() override final;
|
||||
char const* name() const override final { return "RocksDBRestWalHandler"; }
|
||||
|
||||
private:
|
||||
void flush();
|
||||
void transactions();
|
||||
void properties();
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue