diff --git a/arangod/VocBase/server.cpp b/arangod/VocBase/server.cpp index b5099b403a..588b29b457 100644 --- a/arangod/VocBase/server.cpp +++ b/arangod/VocBase/server.cpp @@ -31,6 +31,7 @@ #include "Aql/QueryRegistry.h" #include "Basics/Exceptions.h" #include "Basics/FileUtils.h" +#include "Basics/HybridLogicalClock.h" #include "Basics/MutexLocker.h" #include "Basics/StringUtils.h" #include "Basics/VelocyPackHelper.h" @@ -106,6 +107,12 @@ static uint16_t ServerIdentifier = 0; static std::atomic CurrentTick(0); +//////////////////////////////////////////////////////////////////////////////// +/// @brief a hybrid logical clock +//////////////////////////////////////////////////////////////////////////////// + +static HybridLogicalClock hybridLogicalClock; + //////////////////////////////////////////////////////////////////////////////// /// @brief the server's global id //////////////////////////////////////////////////////////////////////////////// @@ -2060,6 +2067,18 @@ void TRI_GetDatabaseDefaultsServer(TRI_server_t* server, TRI_voc_tick_t TRI_NewTickServer() { return ++CurrentTick; } +//////////////////////////////////////////////////////////////////////////////// +/// @brief create a new tick, using a hybrid logical clock +//////////////////////////////////////////////////////////////////////////////// + +TRI_voc_tick_t TRI_NewTickServerHLC(void) { + return hybridLogicalClock.getTimeStamp(); +} + +TRI_voc_tick_t TRI_NewTickServerHLC(TRI_voc_tick_t received) { + return hybridLogicalClock.getTimeStamp(received); +} + //////////////////////////////////////////////////////////////////////////////// /// @brief updates the tick counter, with lock //////////////////////////////////////////////////////////////////////////////// diff --git a/arangod/VocBase/server.h b/arangod/VocBase/server.h index 390ffde642..eb5c04a8d3 100644 --- a/arangod/VocBase/server.h +++ b/arangod/VocBase/server.h @@ -250,6 +250,13 @@ void TRI_GetDatabaseDefaultsServer(TRI_server_t*, TRI_vocbase_defaults_t*); TRI_voc_tick_t TRI_NewTickServer(void); +//////////////////////////////////////////////////////////////////////////////// +/// @brief create a new tick, using a hybrid logical clock +//////////////////////////////////////////////////////////////////////////////// + +TRI_voc_tick_t TRI_NewTickServerHLC(void); +TRI_voc_tick_t TRI_NewTickServerHLC(TRI_voc_tick_t received); + //////////////////////////////////////////////////////////////////////////////// /// @brief updates the tick counter, with lock //////////////////////////////////////////////////////////////////////////////// diff --git a/lib/Basics/HybridLogicalClock.h b/lib/Basics/HybridLogicalClock.h new file mode 100644 index 0000000000..6a4655d105 --- /dev/null +++ b/lib/Basics/HybridLogicalClock.h @@ -0,0 +1,113 @@ +//////////////////////////////////////////////////////////////////////////////// +/// DISCLAIMER +/// +/// Copyright 2014-2016 ArangoDB GmbH, Cologne, Germany +/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany +/// +/// Licensed under the Apache License, Version 2.0 (the "License"); +/// you may not use this file except in compliance with the License. +/// You may obtain a copy of the License at +/// +/// http://www.apache.org/licenses/LICENSE-2.0 +/// +/// Unless required by applicable law or agreed to in writing, software +/// distributed under the License is distributed on an "AS IS" BASIS, +/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +/// See the License for the specific language governing permissions and +/// limitations under the License. +/// +/// Copyright holder is ArangoDB GmbH, Cologne, Germany +/// +/// @author Max Neunhoeffer +//////////////////////////////////////////////////////////////////////////////// + +#ifndef ARANGODB_BASICS_HYBRID_LOGICAL_CLOCK_H +#define ARANGODB_BASICS_HYBRID_LOGICAL_CLOCK_H 1 + +#include + +class HybridLogicalClock { + std::chrono::high_resolution_clock clock; + std::atomic lastTimeStamp; + + public: + HybridLogicalClock() : lastTimeStamp(0) { + } + HybridLogicalClock(HybridLogicalClock const& other) = delete; + HybridLogicalClock(HybridLogicalClock&& other) = delete; + HybridLogicalClock& operator=(HybridLogicalClock const& other) = delete; + HybridLogicalClock& operator=(HybridLogicalClock&& other) = delete; + + uint64_t getTimeStamp() { + uint64_t oldTimeStamp; + uint64_t newTimeStamp; + do { + uint64_t physical = getPhysicalTime(); + oldTimeStamp = lastTimeStamp.load(std::memory_order_relaxed); + uint64_t oldTime = extractTime(oldTimeStamp); + newTimeStamp = (physical <= oldTime) ? + assembleTimeStamp(oldTime, extractCount(oldTimeStamp)+1) : + assembleTimeStamp(physical, 0); + } while (!lastTimeStamp.compare_exchange_weak(oldTimeStamp, newTimeStamp, + std::memory_order_release, std::memory_order_relaxed)); + return newTimeStamp; + } + + // Call the following when a message with a time stamp has been received: + uint64_t getTimeStamp(uint64_t receivedTimeStamp) { + uint64_t oldTimeStamp; + uint64_t newTimeStamp; + do { + uint64_t physical = getPhysicalTime(); + oldTimeStamp = lastTimeStamp.load(std::memory_order_relaxed); + uint64_t oldTime = extractTime(oldTimeStamp); + uint64_t recTime = extractTime(receivedTimeStamp); + uint64_t newTime = (std::max)((std::max)(oldTime, physical),recTime); + // Note that this implies newTime >= oldTime and newTime >= recTime + uint64_t newCount; + if (newTime == oldTime) { + if (newTime == recTime) { + // all three identical + newCount = (std::max)(extractCount(oldTime), extractCount(recTime))+1; + } else { + // this means recTime < newTime + newCount = extractCount(oldTime) + 1; + } + } else { + // newTime > oldTime + if (newTime == recTime) { + newCount = extractCount(recTime) + 1; + } else { + newCount = 0; + } + } + newTimeStamp = assembleTimeStamp(newTime, newCount); + } while (!lastTimeStamp.compare_exchange_weak(oldTimeStamp, newTimeStamp, + std::memory_order_release, std::memory_order_relaxed)); + return newTimeStamp; + } + + private: + // Helper to get the physical time in milliseconds since the epoch: + uint64_t getPhysicalTime() { + auto now = clock.now(); + uint64_t ms = std::chrono::duration_cast( + now.time_since_epoch()).count(); + return ms; + } + + static uint64_t extractTime(uint64_t t) { + return t >> 20; + } + + static uint64_t extractCount(uint64_t t) { + return t & 0xfffffUL; + } + + static uint64_t assembleTimeStamp(uint64_t time, uint64_t count) { + return (time << 20) | count; + } + +}; + +#endif