diff --git a/arangod/Agency/Store.cpp b/arangod/Agency/Store.cpp index 486e780f3e..b09a233972 100644 --- a/arangod/Agency/Store.cpp +++ b/arangod/Agency/Store.cpp @@ -37,51 +37,18 @@ #include #include #include +#include #include #include #include -#include using namespace arangodb::consensus; using namespace arangodb::basics; -/// Non-Emptyness of string -struct NotEmpty { - bool operator()(const std::string& s) { return !s.empty(); } -}; - -/// @brief Split strings by separator -inline static std::vector split(const std::string& str, char separator) { - std::vector result; - if (str.empty()) { - return result; - } - std::regex reg("/+"); - std::string key = std::regex_replace(str, reg, "/"); - - if (!key.empty() && key.front() == '/') { - key.erase(0, 1); - } - if (!key.empty() && key.back() == '/') { - key.pop_back(); - } - - std::string::size_type p = 0; - std::string::size_type q; - while ((q = key.find(separator, p)) != std::string::npos) { - result.emplace_back(key, p, q - p); - p = q + 1; - } - result.emplace_back(key, p); - result.erase(std::find_if(result.rbegin(), result.rend(), NotEmpty()).base(), - result.end()); - return result; -} - /// Build endpoint from URL -inline static bool endpointPathFromUrl(std::string const& url, - std::string& endpoint, std::string& path) { +static bool endpointPathFromUrl(std::string const& url, + std::string& endpoint, std::string& path) { std::stringstream ep; path = "/"; size_t pos = 7; @@ -405,7 +372,7 @@ check_ret_t Store::check(VPackSlice const& slice, CheckMode mode) const { for (auto const& precond : VPackObjectIterator(slice)) { // Preconditions std::string key = precond.key.copyString(); - std::vector pv = split(key, '/'); + std::vector pv = split(key); Node const* node = &Node::dummyNode(); @@ -578,7 +545,7 @@ bool Store::read(VPackSlice const& query, Builder& ret) const { MUTEX_LOCKER(storeLocker, _storeLock); // Freeze KV-Store for read if (query_strs.size() == 1) { auto const& path = query_strs[0]; - std::vector pv = split(path, '/'); + std::vector pv = split(path); // Build surrounding object structure: size_t e = _node.exists(pv).size(); // note: e <= pv.size()! size_t i = 0; @@ -599,7 +566,7 @@ bool Store::read(VPackSlice const& query, Builder& ret) const { // Create response tree Node copy("copy"); for (auto const& path : query_strs) { - std::vector pv = split(path, '/'); + std::vector pv = split(path); size_t e = _node.exists(pv).size(); if (e == pv.size()) { // existing copy(pv) = _node(pv); @@ -652,7 +619,7 @@ void Store::dumpToBuilder(Builder& builder) const { MUTEX_LOCKER(storeLocker, _storeLock); toBuilder(builder, true); - std::map clean {}; + std::map clean; for (auto const& i : _timeTable) { auto ts = std::chrono::duration_cast( i.first.time_since_epoch()).count(); @@ -688,41 +655,78 @@ void Store::dumpToBuilder(Builder& builder) const { /// Apply transaction to key value store. Guarded by caller bool Store::applies(arangodb::velocypack::Slice const& transaction) { - std::vector keys; - std::vector abskeys; - std::vector idx; - std::regex reg("/+"); - size_t counter = 0; - - for (const auto& atom : VPackObjectIterator(transaction)) { - std::string key(atom.key.copyString()); - keys.push_back(key); - key = std::regex_replace(key, reg, "/"); - abskeys.push_back(((key[0] == '/') ? key : std::string("/") + key)); - idx.push_back(counter++); - } - - sort(idx.begin(), idx.end(), - [&abskeys](size_t i1, size_t i2) { return abskeys[i1] < abskeys[i2]; }); - _storeLock.assertLockedByCurrentThread(); + auto it = VPackObjectIterator(transaction); + + std::vector abskeys; + abskeys.reserve(it.size()); + + std::vector> idx; + idx.reserve(it.size()); + + size_t counter = 0; + while (it.valid()) { + VPackStringRef key = it.key().stringRef(); + + // push back an empty string first, so we can avoid a later move + abskeys.emplace_back(); + + // and now work on this string + std::string& absoluteKey = abskeys.back(); + absoluteKey.reserve(key.size()); + + // turn the path into an absolute path, collapsing all duplicate + // forward slashes into a single forward slash + char const* p = key.data(); + char const* e = key.data() + key.size(); + char last = '\0'; + if (p == e || *p != '/') { + // key must start with '/' + absoluteKey.push_back('/'); + last = '/'; + } + while (p < e) { + char current = *p; + if (current != '/' || last != '/') { + // avoid repeated forward slashes + absoluteKey.push_back(current); + last = current; + } + ++p; + } + + TRI_ASSERT(!absoluteKey.empty()); + TRI_ASSERT(absoluteKey[0] == '/'); + TRI_ASSERT(absoluteKey.find("//") == std::string::npos); + + idx.emplace_back(counter++, it.value()); + + it.next(); + } + + std::sort(idx.begin(), idx.end(), + [&abskeys](std::pair const& i1, std::pair const& i2) noexcept { + return abskeys[i1.first] < abskeys[i2.first]; + }); + for (const auto& i : idx) { - std::string const& key = keys.at(i); - Slice value = transaction.get(key); + Slice value = i.second; if (value.isObject() && value.hasKey("op")) { - if (value.get("op").isEqualString("delete") || - value.get("op").isEqualString("replace") || - value.get("op").isEqualString("erase")) { - if (!_node.has(abskeys.at(i))) { + Slice const op = value.get("op"); + + if (op.isEqualString("delete") || + op.isEqualString("replace") || + op.isEqualString("erase")) { + if (!_node.has(abskeys.at(i.first))) { continue; } } - auto uri = Node::normalize(abskeys.at(i)); - if (value.get("op").isEqualString("observe")) { + auto uri = Node::normalize(abskeys.at(i.first)); + if (op.isEqualString("observe")) { bool found = false; - if (value.hasKey("url") && value.get("url").isString()) { + if (value.get("url").isString()) { auto url = value.get("url").copyString(); auto ret = _observerTable.equal_range(url); for (auto it = ret.first; it != ret.second; ++it) { @@ -736,8 +740,8 @@ bool Store::applies(arangodb::velocypack::Slice const& transaction) { _observedTable.emplace(std::pair(uri, url)); } } - } else if (value.get("op").isEqualString("unobserve")) { - if (value.hasKey("url") && value.get("url").isString()) { + } else if (op.isEqualString("unobserve")) { + if (value.get("url").isString()) { auto url = value.get("url").copyString(); auto ret = _observerTable.equal_range(url); for (auto it = ret.first; it != ret.second; ++it) { @@ -755,10 +759,10 @@ bool Store::applies(arangodb::velocypack::Slice const& transaction) { } } } else { - _node.hasAsWritableNode(abskeys.at(i)).first.applieOp(value); + _node.hasAsWritableNode(abskeys.at(i.first)).first.applieOp(value); } } else { - _node.hasAsWritableNode(abskeys.at(i)).first.applies(value); + _node.hasAsWritableNode(abskeys.at(i.first)).first.applies(value); } } @@ -884,3 +888,42 @@ void Store::removeTTL(std::string const& uri) { } } } + +/// @brief Split strings by forward slashes, omitting empty strings +std::vector Store::split(std::string const& str) { + std::vector result; + + char const* p = str.data(); + char const* e = str.data() + str.size(); + + // strip leading forward slashes + while (p != e && *p == '/') { + ++p; + } + + // strip trailing forward slashes + while (p != e && *(e - 1) == '/') { + --e; + } + + char const* start = nullptr; + while (p != e) { + if (*p == '/') { + if (start != nullptr) { + // had already found something + result.emplace_back(start, p - start); + start = nullptr; + } + } else { + if (start == nullptr) { + start = p; + } + } + ++p; + } + if (start != nullptr) { + result.emplace_back(start, p - start); + } + + return result; +} diff --git a/arangod/Agency/Store.h b/arangod/Agency/Store.h index 05b69d197d..5d113d7bf3 100644 --- a/arangod/Agency/Store.h +++ b/arangod/Agency/Store.h @@ -148,6 +148,10 @@ class Store { std::unordered_multimap& observedTable(); std::unordered_multimap const& observedTable() const; + /// @brief Split strings by forward slashes, omitting empty strings + /// this function is only public so that it can be test by unit tests + static std::vector split(std::string const& str); + private: /// @brief Check precondition check_ret_t check(arangodb::velocypack::Slice const&, CheckMode = FIRST_FAIL) const; diff --git a/tests/Agency/StoreTest.cpp b/tests/Agency/StoreTest.cpp index 404019bea9..8604113d27 100644 --- a/tests/Agency/StoreTest.cpp +++ b/tests/Agency/StoreTest.cpp @@ -69,3 +69,52 @@ TEST(StoreTest, store_preconditions) { ASSERT_EQ(node, other.slice()); } + +TEST(StoreTest, store_split) { + using namespace arangodb::consensus; + + ASSERT_EQ(std::vector(), Store::split("")); + ASSERT_EQ(std::vector(), Store::split("/")); + ASSERT_EQ(std::vector(), Store::split("//")); + ASSERT_EQ(std::vector(), Store::split("///")); + + ASSERT_EQ((std::vector{"a"}), Store::split("a")); + ASSERT_EQ((std::vector{"a c"}), Store::split("a c")); + ASSERT_EQ((std::vector{"foobar"}), Store::split("foobar")); + ASSERT_EQ((std::vector{"foo bar"}), Store::split("foo bar")); + + ASSERT_EQ((std::vector{"a", "b", "c"}), Store::split("a/b/c")); + ASSERT_EQ((std::vector{"a", "b", "c"}), Store::split("/a/b/c")); + ASSERT_EQ((std::vector{"a", "b", "c"}), Store::split("/a/b/c/")); + ASSERT_EQ((std::vector{"a", "b", "c"}), Store::split("//a/b/c")); + ASSERT_EQ((std::vector{"a", "b", "c"}), Store::split("//a/b/c/")); + ASSERT_EQ((std::vector{"a", "b", "c"}), Store::split("a/b/c//")); + ASSERT_EQ((std::vector{"a", "b", "c"}), Store::split("//a/b/c//")); + ASSERT_EQ((std::vector{"a", "b", "c"}), Store::split("//a/b/c//")); + ASSERT_EQ((std::vector{"a"}), Store::split("//////a")); + ASSERT_EQ((std::vector{"a"}), Store::split("a//////////////")); + ASSERT_EQ((std::vector{"a"}), Store::split("/////////////a//////////////")); + ASSERT_EQ((std::vector{"foobar"}), Store::split("//////foobar")); + ASSERT_EQ((std::vector{"foobar"}), Store::split("foobar//////////////")); + ASSERT_EQ((std::vector{"foobar"}), Store::split("/////////////foobar//////////////")); + ASSERT_EQ((std::vector{"a", "c"}), Store::split("a/c")); + ASSERT_EQ((std::vector{"a", "c"}), Store::split("a//c")); + ASSERT_EQ((std::vector{"a", "c"}), Store::split("a///c")); + ASSERT_EQ((std::vector{"a", "c"}), Store::split("/a//c")); + ASSERT_EQ((std::vector{"a", "c"}), Store::split("/a//c")); + ASSERT_EQ((std::vector{"a", "c"}), Store::split("/a///c")); + ASSERT_EQ((std::vector{"a", "c"}), Store::split("/a//c/")); + ASSERT_EQ((std::vector{"a", "c"}), Store::split("/a//c//")); + ASSERT_EQ((std::vector{"a", "c"}), Store::split("/a///c//")); + ASSERT_EQ((std::vector{"foo", "bar"}), Store::split("foo/bar")); + ASSERT_EQ((std::vector{"foo", "bar"}), Store::split("foo//bar")); + ASSERT_EQ((std::vector{"foo", "bar"}), Store::split("foo///bar")); + ASSERT_EQ((std::vector{"foo", "bar"}), Store::split("/foo//bar")); + ASSERT_EQ((std::vector{"foo", "bar"}), Store::split("/foo//bar")); + ASSERT_EQ((std::vector{"foo", "bar"}), Store::split("/foo///bar")); + ASSERT_EQ((std::vector{"foo", "bar"}), Store::split("/foo//bar/")); + ASSERT_EQ((std::vector{"foo", "bar"}), Store::split("/foo//bar//")); + ASSERT_EQ((std::vector{"foo", "bar"}), Store::split("/foo///bar//")); + + ASSERT_EQ((std::vector{"foo", "bar", "baz"}), Store::split("/foo///bar//baz")); +}