1
0
Fork 0

Replace DataGuardian by ThreadProtector.

This commit is contained in:
Max Neunhoeffer 2015-08-06 13:03:37 +02:00 committed by Michael Hackstein
parent 5faf5d8248
commit 49f74b1baf
5 changed files with 211 additions and 244 deletions

View File

@ -1,244 +0,0 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief Helper class to isolate data protection with hazard pointers.
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2015 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2015 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
/// @author Copyright 2015, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2009-2015, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_BASICS_DATA_GUARDIAN_H
#define ARANGODB_BASICS_DATA_GUARDIAN_H 1
namespace triagens {
namespace basics {
////////////////////////////////////////////////////////////////////////////////
/// @brief DataGuardian, a template class to manage a single pointer to some
/// class, optimized for many fast readers and slow writers using lockfree
/// hazard pointer technology.
////////////////////////////////////////////////////////////////////////////////
template<typename T>
class DataGuardian {
public:
////////////////////////////////////////////////////////////////////////////////
/// @brief HazardPtr, class for keeping a hazard pointer with appropriate
/// padding.
////////////////////////////////////////////////////////////////////////////////
class HazardPtr {
public:
std::atomic<T const*> ptr;
private:
char padding[64-sizeof(std::atomic<T const*>)];
};
////////////////////////////////////////////////////////////////////////////////
/// @brief constructor
////////////////////////////////////////////////////////////////////////////////
DataGuardian () {
_P[0].ptr = nullptr;
_P[1].ptr = nullptr;
_V = 0;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief destructor
////////////////////////////////////////////////////////////////////////////////
~DataGuardian () {
std::lock_guard<std::mutex> lock(_mutex);
while (isHazard(_P[_V].ptr)) {
usleep(250);
}
T const* temp = _P[_V].ptr.load();
delete temp; // OK, if nullptr
_P[_V].ptr = nullptr;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief registerHazard, every thread that wants to read the guarded pointer
/// has to create an instance of HazardPtr first and then register it with
/// this object. After use one has to unregisterHazard (see below) the
/// structure again.
////////////////////////////////////////////////////////////////////////////////
void registerHazard (HazardPtr& h) {
std::lock_guard<std::mutex> lock(_mutex);
_H.push_back(&h);
return;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief unregisterHazard
////////////////////////////////////////////////////////////////////////////////
void unregisterHazard (HazardPtr& h) {
std::lock_guard<std::mutex> lock(_mutex);
for (auto it = _H.begin(); it != _H.end(); it++) {
if (*it == &h) {
_H.erase(it);
return;
}
}
}
////////////////////////////////////////////////////////////////////////////////
/// @brief lease, call this with your registered HazardPtr structure before
/// using the pointer. It is safe to use this pointer until one calls unlease.
////////////////////////////////////////////////////////////////////////////////
T const* lease (HazardPtr& h) {
int v;
T const* p;
while (true) {
v = _V.load(std::memory_order_consume); // (XXX)
// This memory_order_consume corresponds to the change to _V
// in exchange() below which uses memory_order_seq_cst, which
// implies release semantics. This is important to ensure that
// we see the changes to _P just before the version _V
// is flipped.
p = _P[v].ptr.load(std::memory_order_relaxed);
h.ptr = p; // implicit memory_order_seq_cst
if (_V.load(std::memory_order_relaxed) != v) { // (YYY)
h.ptr = nullptr; // implicit memory_order_seq_cst
continue;
}
break;
};
return p;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief unlease, call this with your registered HazardPtr structure after
/// using the pointer.
////////////////////////////////////////////////////////////////////////////////
void unlease (HazardPtr& h) {
h.ptr = nullptr; // implicit memory_order_seq_cst
}
////////////////////////////////////////////////////////////////////////////////
/// @brief exchange, call this to exchange the guarded pointer. The function
/// can be slow and returns the old version. It is guaranteed that no reader
/// is still reading the old version when this method returns, therefore it
/// is safe to delete the pointer in that case.
////////////////////////////////////////////////////////////////////////////////
T const* exchange (T const* replacement) {
std::lock_guard<std::mutex> lock(_mutex);
int v = _V.load(std::memory_order_relaxed);
_P[1-v].ptr.store(replacement, std::memory_order_relaxed);
_V = 1-v; // implicit memory_order_seq_cst, whoever sees this
// also sees the two above modifications!
// Our job is essentially done, we only need to destroy
// the old value. However, this might be unsafe, because there might
// be a reader. All readers have indicated their reading activity
// with a store(std::memory_order_seq_cst) to _H[<theirId>]. After that
// indication, they have rechecked the value of _V and have thus
// confirmed that it was not yet changed. Therefore, we can simply
// observe _H[*] and wait until none is equal to _P[v]:
T const* p = _P[v].ptr.load(std::memory_order_relaxed);
while (isHazard(p)) {
usleep(250);
}
// Now it is safe to destroy _P[v]
_P[v].ptr = nullptr;
return p;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief internal check whether a given pointer is a hazard
////////////////////////////////////////////////////////////////////////////////
private:
bool isHazard (T const* p) {
for (size_t i = 0; i < _H.size(); i++) {
T const* g = _H[i]->ptr.load(std::memory_order_relaxed);
if (g != nullptr && g == p) {
return true;
}
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
/// @brief the two versions of the pointer we take care of
////////////////////////////////////////////////////////////////////////////////
HazardPtr _P[2];
////////////////////////////////////////////////////////////////////////////////
/// @brief current version of _P, can be 0 or 1
////////////////////////////////////////////////////////////////////////////////
std::atomic<int> _V;
////////////////////////////////////////////////////////////////////////////////
/// @brief pointers to all hazard pointer structures that are registered
////////////////////////////////////////////////////////////////////////////////
std::vector<HazardPtr*> _H;
////////////////////////////////////////////////////////////////////////////////
/// @brief a mutex, only used for slow operations
////////////////////////////////////////////////////////////////////////////////
std::mutex _mutex;
// Here is a proof that this is all OK: The mutex only ensures
// that there is always only at most one mutating thread.
// All is standard, except that we must ensure that whenever
// _V is changed the mutating thread knows about all readers
// that are still using the old version, which is done through
// _H[myId]->ptr where id is the id of a thread. The critical
// argument needed is the following: Both the change to
// _H[myId]->ptr in lease() and the change to _V in exchange() use
// memory_order_seq_cst, therefore they happen in some sequential
// order and all threads observe the same order. If the reader
// in line (YYY) above sees the same value as before in line
// (XXX), then any write to _V must be later in the total order
// of modifications than the change to _H[myId]->ptr. Therefore
// the mutating thread must see the change to _H[myId]->ptr, after
// all, it sees its own change to _V. Therefore it is ensured
// that _P[v].ptr is returned only when all reading threads have
// terminated their lease through unlease(), and thus it is safe
// to be deleted.
};
} // namespace triagens::basics
} // namespace triagens
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -0,0 +1,44 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief Helper class to isolate data protection
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2015 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2015 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
/// @author Copyright 2015, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2009-2015, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#include "Basics/ThreadProtector.h"
template<int Nr>
thread_local int triagens::basics::ThreadProtector<Nr>::_mySlot = -1;
/// We need an explicit template initialisation for each value of the
/// template parameter, such that the compiler can allocate a thread local
/// variable for each of them.
template class triagens::basics::ThreadProtector<64>;
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -0,0 +1,165 @@
////////////////////////////////////////////////////////////////////////////////
/// @brief Helper class to isolate data protection
///
/// @file
///
/// DISCLAIMER
///
/// Copyright 2015 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2015 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
///
/// @author Max Neunhoeffer
/// @author Copyright 2015, ArangoDB GmbH, Cologne, Germany
/// @author Copyright 2009-2015, triAGENS GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////
#ifndef ARANGODB_BASICS_THREAD_PROTECTOR_H
#define ARANGODB_BASICS_THREAD_PROTECTOR_H 1
#include "Basics/Common.h"
namespace triagens {
namespace basics {
////////////////////////////////////////////////////////////////////////////////
/// @brief ThreadProtector, a template class to manage a single atomic
/// value (can be a pointer to some class), optimized for many fast
/// readers and slow writers using lockfree technology.
/// The template parameter Nr should be on the order of magnitude of the
/// maximal number of concurrently running threads.
/// Usage:
/// Put an instance of the ThreadProtector next to the atomic value you
/// want to protect, as in:
/// atomic<SomeClass*> p;
/// ThreadProtector<64> prot;
/// If you want to read p and *p, do
/// auto unuser(prot.use());
/// auto pSeen = p;
/// in the scope where you want to read p and *p and then only use pSeen
/// and *pSeen.
/// It is automatically released when the unuser instances goes out of scope.
/// This is guaranteed to be very fast, even when multiple threads do it
/// concurrently.
/// If you want to change p (and call delete on the old value, say), then
/// auto oldp = p; // save the old value of p
/// p = <new Value>; // just assign p whenever you see fit
/// prot.scan(); // This will block until no thread is reading
/// // the old value any more.
/// delete oldp; // guaranteed to be safe
/// Please note:
/// - The value of p *can* change under the feet of the reading threads,
/// which is why you need to use the pSeen variable. However, you know
/// that as long as unused is in scope, pSeen remains valid.
/// - The ThreadProtector instances needs 64*Nr bytes of memory.
/// - ThreadProtector.cpp needs to contain an explicit template
/// instanciation for all values of Nr used in the executable.
////////////////////////////////////////////////////////////////////////////////
template<int Nr>
class ThreadProtector {
struct alignas(64) Entry { // 64 is the size of a cache line,
// it is important that different list entries lie in different
// cache lines.
std::atomic<int> _count;
};
Entry* _list;
std::atomic<int> _last;
static thread_local int _mySlot;
public:
// A class to automatically unuse the threadprotector:
class UnUser {
ThreadProtector* _prot;
int _id;
public:
UnUser (ThreadProtector* p, int i) : _prot(p), _id(i) {
}
~UnUser () {
if (_prot != nullptr) {
_prot->unUse(_id);
}
}
// A move constructor
UnUser (UnUser&& that) : _prot(that._prot), _id(that._id) {
// Note that return value optimization will usually avoid
// this move constructor completely. However, it has to be
// present for the program to compile.
that._prot = nullptr;
}
// Explicitly delete the others:
UnUser (UnUser const& that) = delete;
UnUser& operator= (UnUser const& that) = delete;
UnUser& operator= (UnUser&& that) = delete;
UnUser () = delete;
};
ThreadProtector () : _last(0) {
_list = new Entry[Nr];
// Just to be sure:
for (size_t i = 0; i < Nr; i++) {
_list[i]._count = 0;
}
}
~ThreadProtector () {
delete[] _list;
}
UnUser use () {
int id = _mySlot;
if (id < 0) {
id = _last++;
if (_last > Nr) {
_last = 0;
}
_mySlot = id;
}
_list[id]._count++; // this is implicitly using memory_order_seq_cst
return UnUser(this, id); // return value optimization!
}
void scan () {
for (size_t i = 0; i < Nr; i++) {
while (_list[i]._count > 0) {
usleep(250);
}
}
}
private:
void unUse (int id) {
_list[id]._count--; // this is implicitly using memory_order_seq_cst
}
};
} // namespace triagens::basics
} // namespace triagens
#endif
// Local Variables:
// mode: outline-minor
// outline-regexp: "/// @brief\\|/// {@inheritDoc}\\|/// @page\\|// --SECTION--\\|/// @\\}"
// End:

View File

@ -93,6 +93,7 @@ add_library(
Basics/terminal-utils.cpp
Basics/Thread.cpp
Basics/ThreadPool.cpp
Basics/ThreadProtector.cpp
Basics/tri-strings.cpp
Basics/tri-zip.cpp
Basics/Utf8Helper.cpp

View File

@ -65,6 +65,7 @@ lib_libarango_a_SOURCES = \
lib/Basics/terminal-utils.cpp \
lib/Basics/Thread.cpp \
lib/Basics/ThreadPool.cpp \
lib/Basics/ThreadProtector.cpp \
lib/Basics/threads-posix.cpp \
lib/Basics/tri-strings.cpp \
lib/Basics/tri-zip.cpp \