diff --git a/arangod/RocksDBEngine/CMakeLists.txt b/arangod/RocksDBEngine/CMakeLists.txt index 2632e1c1ce..7754f370ca 100644 --- a/arangod/RocksDBEngine/CMakeLists.txt +++ b/arangod/RocksDBEngine/CMakeLists.txt @@ -14,6 +14,7 @@ set(ROCKSDB_SOURCES RocksDBEngine/RocksDBKey.cpp RocksDBEngine/RocksDBKeyBounds.cpp RocksDBEngine/RocksDBPrimaryIndex.cpp + RocksDBEngine/RocksDBRestWalHandler.cpp RocksDBEngine/RocksDBTransactionCollection.cpp RocksDBEngine/RocksDBTransactionState.cpp RocksDBEngine/RocksDBTypes.cpp diff --git a/arangod/RocksDBEngine/RocksDBCollection.cpp b/arangod/RocksDBEngine/RocksDBCollection.cpp index a5b3e5892b..8e1500aac2 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.cpp +++ b/arangod/RocksDBEngine/RocksDBCollection.cpp @@ -848,12 +848,11 @@ void RocksDBCollection::addIndexCoordinator( int RocksDBCollection::saveIndex(transaction::Methods* trx, std::shared_ptr idx) { TRI_ASSERT(!ServerState::instance()->isCoordinator()); - // we cannot persist PrimaryMockIndex + // we cannot persist primary or edge indexes TRI_ASSERT(idx->type() != Index::IndexType::TRI_IDX_TYPE_PRIMARY_INDEX); - std::vector> indexListLocal; - indexListLocal.emplace_back(idx); + TRI_ASSERT(idx->type() != Index::IndexType::TRI_IDX_TYPE_EDGE_INDEX); - Result res = fillIndexes(trx, indexListLocal); + Result res = fillIndexes(trx, idx); if (!res.ok()) { return res.errorNumber(); } @@ -870,28 +869,25 @@ int RocksDBCollection::saveIndex(transaction::Methods* trx, } arangodb::Result RocksDBCollection::fillIndexes(transaction::Methods* trx, - std::vector> added) { + std::shared_ptr added) { ManagedDocumentResult mmr; std::unique_ptr iter(primaryIndex()->allIterator(trx, &mmr, false)); + int res = TRI_ERROR_NO_ERROR; std::vector tokens; auto cb = [&](DocumentIdentifierToken token) { - - bool ret = this->readDocument(trx, token, mmr); - if (ret) { - for (std::shared_ptr index : added) { - RocksDBIndex *ridx = static_cast(index.get()); - ridx->insert(trx, mmr.lastRevisionId(), VPackSlice(mmr.vpack()), false); - } + if (res == TRI_ERROR_NO_ERROR && this->readDocument(trx, token, mmr)) { + RocksDBIndex *ridx = static_cast(added.get()); + res = ridx->insert(trx, mmr.lastRevisionId(), VPackSlice(mmr.vpack()), false); } }; - while (iter->next(cb, 10000)) { + while (iter->next(cb, 1000) && res == TRI_ERROR_NO_ERROR) { if (_logicalCollection->deleted()) { return Result(TRI_ERROR_INTERNAL); } } - return Result(); + return Result(res); } // @brief return the primary index diff --git a/arangod/RocksDBEngine/RocksDBCollection.h b/arangod/RocksDBEngine/RocksDBCollection.h index 16ef200f1d..3316fe4f71 100644 --- a/arangod/RocksDBEngine/RocksDBCollection.h +++ b/arangod/RocksDBEngine/RocksDBCollection.h @@ -189,7 +189,7 @@ class RocksDBCollection final : public PhysicalCollection { std::shared_ptr idx); arangodb::Result fillIndexes(transaction::Methods*, - std::vector>); + std::shared_ptr); arangodb::RocksDBPrimaryIndex* primaryIndex() const; diff --git a/arangod/RocksDBEngine/RocksDBEngine.cpp b/arangod/RocksDBEngine/RocksDBEngine.cpp index 69eae9ed16..103e304d8b 100644 --- a/arangod/RocksDBEngine/RocksDBEngine.cpp +++ b/arangod/RocksDBEngine/RocksDBEngine.cpp @@ -31,6 +31,8 @@ #include "Logger/Logger.h" #include "ProgramOptions/ProgramOptions.h" #include "ProgramOptions/Section.h" +#include "GeneralServer/RestHandlerFactory.h" +#include "RestHandler/RestHandlerCreator.h" #include "RestServer/DatabasePathFeature.h" #include "RestServer/ViewTypesFeature.h" #include "RocksDBEngine/RocksDBCollection.h" @@ -40,6 +42,7 @@ #include "RocksDBEngine/RocksDBIndex.h" #include "RocksDBEngine/RocksDBIndexFactory.h" #include "RocksDBEngine/RocksDBKey.h" +#include "RocksDBEngine/RocksDBRestWalHandler.h" #include "RocksDBEngine/RocksDBTransactionCollection.h" #include "RocksDBEngine/RocksDBTransactionContextData.h" #include "RocksDBEngine/RocksDBTransactionState.h" @@ -720,8 +723,13 @@ void RocksDBEngine::addV8Functions() { } /// @brief Add engine-specific REST handlers -void RocksDBEngine::addRestHandlers(rest::RestHandlerFactory*) { +void RocksDBEngine::addRestHandlers(rest::RestHandlerFactory* handlerFactory) { // TODO: add /_api/export and /_admin/wal later + handlerFactory->addPrefixHandler("/_admin/wal", + RestHandlerCreator::createNoData); + + //handlerFactory->addPrefixHandler( + // "/_api/export", RestHandlerCreator::createNoData); } Result RocksDBEngine::dropDatabase(TRI_voc_tick_t id) { diff --git a/arangod/RocksDBEngine/RocksDBRestWalHandler.cpp b/arangod/RocksDBEngine/RocksDBRestWalHandler.cpp new file mode 100644 index 0000000000..bf33791692 --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBRestWalHandler.cpp @@ -0,0 +1,252 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Jan Steemann +//////////////////////////////////////////////////////////////////////////////// + +#include "RocksDBRestWalHandler.h" +#include "Basics/VelocyPackHelper.h" +#include "Cluster/ClusterMethods.h" +#include "Cluster/ServerState.h" +#include "RocksDBEngine/RocksDBCommon.h" +#include "RocksDBEngine/RocksDBEngine.h" +#include "StorageEngine/EngineSelectorFeature.h" + +#include + +using namespace arangodb; +using namespace arangodb::rest; + +RocksDBRestWalHandler::RocksDBRestWalHandler(GeneralRequest* request, + GeneralResponse* response) + : RestVocbaseBaseHandler(request, response) {} + +RestStatus RocksDBRestWalHandler::execute() { + std::vector const& suffixes = _request->suffixes(); + + if (suffixes.size() != 1) { + generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, + "expecting /_admin/wal/"); + return RestStatus::DONE; + } + + std::string const& operation = suffixes[0]; + + // extract the sub-request type + auto const type = _request->requestType(); + + if (operation == "transactions") { + if (type == rest::RequestType::GET) { + transactions(); + return RestStatus::DONE; + } + } else if (operation == "flush") { + if (type == rest::RequestType::PUT) { + flush(); + return RestStatus::DONE; + } + } else if (operation == "properties") { + if (type == rest::RequestType::GET || type == rest::RequestType::PUT) { + properties(); + return RestStatus::DONE; + } + } else { + generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, + "expecting /_admin/wal/"); + return RestStatus::DONE; + } + + generateError(rest::ResponseCode::METHOD_NOT_ALLOWED, + TRI_ERROR_HTTP_METHOD_NOT_ALLOWED); + return RestStatus::DONE; +} + +void RocksDBRestWalHandler::properties() { + /*auto l = MMFilesLogfileManager::instance(); + + if (_request->requestType() == rest::RequestType::PUT) { + std::shared_ptr parsedRequest; + VPackSlice slice; + try { + slice = _request->payload(); + } catch (...) { + generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid body value. expecting object"); + return; + } + if (!slice.isObject()) { + generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid body value. expecting object"); + } + + if (slice.hasKey("allowOversizeEntries")) { + bool value = slice.get("allowOversizeEntries").getBoolean(); + l->allowOversizeEntries(value); + } + + if (slice.hasKey("logfileSize")) { + uint32_t value = slice.get("logfileSize").getNumericValue(); + l->filesize(value); + } + + if (slice.hasKey("historicLogfiles")) { + uint32_t value = + slice.get("historicLogfiles").getNumericValue(); + l->historicLogfiles(value); + } + + if (slice.hasKey("reserveLogfiles")) { + uint32_t value = slice.get("reserveLogfiles").getNumericValue(); + l->reserveLogfiles(value); + } + + if (slice.hasKey("throttleWait")) { + uint64_t value = slice.get("throttleWait").getNumericValue(); + l->maxThrottleWait(value); + } + + if (slice.hasKey("throttleWhenPending")) { + uint64_t value = + slice.get("throttleWhenPending").getNumericValue(); + l->throttleWhenPending(value); + } + } + + VPackBuilder builder; + builder.openObject(); + builder.add("allowOversizeEntries", VPackValue(l->allowOversizeEntries())); + builder.add("logfileSize", VPackValue(l->filesize())); + builder.add("historicLogfiles", VPackValue(l->historicLogfiles())); + builder.add("reserveLogfiles", VPackValue(l->reserveLogfiles())); + builder.add("syncInterval", VPackValue(l->syncInterval())); + builder.add("throttleWait", VPackValue(l->maxThrottleWait())); + builder.add("throttleWhenPending", VPackValue(l->throttleWhenPending())); + + builder.close();*/ + generateResult(rest::ResponseCode::NOT_IMPLEMENTED, + basics::VelocyPackHelper::EmptyObjectValue()); +} + +void RocksDBRestWalHandler::flush() { + std::shared_ptr parsedRequest; + VPackSlice slice; + try { + slice = _request->payload(); + } catch (...) { + generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid body value. expecting object"); + return; + } + if (!slice.isObject() && !slice.isNone()) { + generateError(rest::ResponseCode::BAD, TRI_ERROR_HTTP_BAD_PARAMETER, + "invalid body value. expecting object"); + } + + bool waitForSync = false; + bool waitForCollector = false; + + if (slice.isObject()) { + // got a request body + VPackSlice value = slice.get("waitForSync"); + if (value.isString()) { + waitForSync = (value.copyString() == "true"); + } else if (value.isBoolean()) { + waitForSync = value.getBoolean(); + } + + value = slice.get("waitForCollector"); + if (value.isString()) { + waitForCollector = (value.copyString() == "true"); + } else if (value.isBoolean()) { + waitForCollector = value.getBoolean(); + } + } else { + // no request body + bool found; + { + std::string const& v = _request->value("waitForSync", found); + if (found) { + waitForSync = (v == "1" || v == "true"); + } + } + { + std::string const& v = _request->value("waitForCollector", found); + if (found) { + waitForCollector = (v == "1" || v == "true"); + } + } + } + + int res = TRI_ERROR_NO_ERROR; + if (ServerState::instance()->isCoordinator()) { + res = flushWalOnAllDBServers(waitForSync, waitForCollector); + } else { + rocksdb::TransactionDB* db = + static_cast(EngineSelectorFeature::ENGINE)->db(); + + rocksdb::Status status = db->GetBaseDB()->SyncWAL(); + if (!status.ok()) { + res = rocksutils::convertStatus(status).errorNumber(); + } + // res = MMFilesLogfileManager::instance()->flush( + // waitForSync, waitForCollector, false); + } + + if (res != TRI_ERROR_NO_ERROR) { + THROW_ARANGO_EXCEPTION(res); + } + generateResult(rest::ResponseCode::OK, + basics::VelocyPackHelper::EmptyObjectValue()); +} + +void RocksDBRestWalHandler::transactions() { + VPackBuilder builder; + + /*auto const& info = + MMFilesLogfileManager::instance()->runningTransactions(); + + builder.openObject(); + builder.add("runningTransactions", + VPackValue(static_cast(std::get<0>(info)))); + + // lastCollectedId + { + auto value = std::get<1>(info); + if (value == UINT64_MAX) { + builder.add("minLastCollected", VPackValue(VPackValueType::Null)); + } else { + builder.add("minLastCollected", VPackValue(value)); + } + } + + // lastSealedId + { + auto value = std::get<2>(info); + if (value == UINT64_MAX) { + builder.add("minLastSealed", VPackValue(VPackValueType::Null)); + } else { + builder.add("minLastSealed", VPackValue(value)); + } + } + + builder.close();*/ + + generateResult(rest::ResponseCode::NOT_IMPLEMENTED, builder.slice()); +} diff --git a/arangod/RocksDBEngine/RocksDBRestWalHandler.h b/arangod/RocksDBEngine/RocksDBRestWalHandler.h new file mode 100644 index 0000000000..711dabde95 --- /dev/null +++ b/arangod/RocksDBEngine/RocksDBRestWalHandler.h @@ -0,0 +1,47 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2017 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Simon Grätzer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGOD_ROCKSDB_REST_WAL_HANDLER_H +#define ARANGOD_MMFILES_MMFILES_REST_WAL_HANDLER_H 1 + +#include "Basics/Common.h" +#include "RestHandler/RestVocbaseBaseHandler.h" + +namespace arangodb { + +class RocksDBRestWalHandler : public RestVocbaseBaseHandler { + public: + RocksDBRestWalHandler(GeneralRequest*, GeneralResponse*); + + public: + RestStatus execute() override final; + char const* name() const override final { return "RocksDBRestWalHandler"; } + + private: + void flush(); + void transactions(); + void properties(); +}; +} + +#endif