Commit 561b171a authored by Tom Barbette's avatar Tom Barbette

More locks

parent b867ec79
Pipeline #1850 passed with stage
in 10 minutes and 22 seconds
...@@ -322,16 +322,18 @@ Counter::reset() ...@@ -322,16 +322,18 @@ Counter::reset()
CounterBase::reset(); CounterBase::reset();
} }
CounterMP::CounterMP() template <typename T>
CounterMPBase<T>::CounterMPBase()
{ {
} }
template <typename T>
CounterMP::~CounterMP() CounterMPBase<T>::~CounterMPBase()
{ {
} }
template <typename T>
int int
CounterMP::initialize(ErrorHandler *errh) { CounterMPBase<T>::initialize(ErrorHandler *errh) {
if (CounterBase::initialize(errh) != 0) if (CounterBase::initialize(errh) != 0)
return -1; return -1;
//If not in simple mode, we only allow one writer so we can sum up the total number of threads //If not in simple mode, we only allow one writer so we can sum up the total number of threads
...@@ -340,27 +342,29 @@ CounterMP::initialize(ErrorHandler *errh) { ...@@ -340,27 +342,29 @@ CounterMP::initialize(ErrorHandler *errh) {
return 0; return 0;
} }
template <typename T>
Packet* Packet*
CounterMP::simple_action(Packet *p) CounterMPBase<T>::simple_action(Packet *p)
{ {
if (_atomic > 0) if (_atomic > 0)
_atomic_lock.write_begin(); _atomic_lock.write_begin();
_stats->_count++; _stats->_count++;
_stats->_byte_count += p->length(); _stats->_byte_count += p->length();
if (unlikely(!_simple)) if (unlikely(!_simple))
check_handlers(CounterMP::count(), CounterMP::byte_count()); //BUG : if not atomic, then handler may be called twice check_handlers(CounterMPBase<T>::count(), CounterMPBase<T>::byte_count()); //BUG : if not atomic, then handler may be called twice
if (_atomic > 0) if (_atomic > 0)
_atomic_lock.write_end(); _atomic_lock.write_end();
return p; return p;
} }
#if HAVE_BATCH #if HAVE_BATCH
template <typename T>
PacketBatch* PacketBatch*
CounterMP::simple_action_batch(PacketBatch *batch) CounterMPBase<T>::simple_action_batch(PacketBatch *batch)
{ {
if (unlikely(_batch_precise)) { if (unlikely(_batch_precise)) {
FOR_EACH_PACKET(batch,p) FOR_EACH_PACKET(batch,p)
CounterMP::simple_action(p); CounterMPBase<T>::simple_action(p);
return batch; return batch;
} }
...@@ -373,15 +377,16 @@ CounterMP::simple_action_batch(PacketBatch *batch) ...@@ -373,15 +377,16 @@ CounterMP::simple_action_batch(PacketBatch *batch)
_stats->_count += batch->count(); _stats->_count += batch->count();
_stats->_byte_count += bc; _stats->_byte_count += bc;
if (unlikely(!_simple)) if (unlikely(!_simple))
check_handlers(CounterMP::count(), CounterMP::byte_count()); check_handlers(CounterMPBase<T>::count(), CounterMPBase<T>::byte_count());
if (_atomic > 0) if (_atomic > 0)
_atomic_lock.write_end(); _atomic_lock.write_end();
return batch; return batch;
} }
#endif #endif
template <typename T>
void void
CounterMP::reset() CounterMPBase<T>::reset()
{ {
if (_atomic > 0) if (_atomic > 0)
_atomic_lock.write_begin(); _atomic_lock.write_begin();
...@@ -394,6 +399,11 @@ CounterMP::reset() ...@@ -394,6 +399,11 @@ CounterMP::reset()
_atomic_lock.write_end(); _atomic_lock.write_end();
} }
template class CounterMPBase<rXwlockPR>;
template class CounterRxWMPBase<rXwlock>;
template class CounterRxWMPBase<rXwlockPR>;
CounterRxWMP::CounterRxWMP() CounterRxWMP::CounterRxWMP()
{ {
_atomic = 2; _atomic = 2;
...@@ -403,6 +413,25 @@ CounterRxWMP::~CounterRxWMP() ...@@ -403,6 +413,25 @@ CounterRxWMP::~CounterRxWMP()
{ {
} }
CounterRxWMPPR::CounterRxWMPPR()
{
_atomic = 2;
}
CounterRxWMPPR::~CounterRxWMPPR()
{
}
CounterRxWMPPW::CounterRxWMPPW()
{
_atomic = 2;
}
CounterRxWMPPW::~CounterRxWMPPW()
{
}
CounterLockMP::CounterLockMP() CounterLockMP::CounterLockMP()
{ {
_atomic = 1; _atomic = 1;
...@@ -913,6 +942,10 @@ EXPORT_ELEMENT(CounterMP) ...@@ -913,6 +942,10 @@ EXPORT_ELEMENT(CounterMP)
ELEMENT_MT_SAFE(CounterMP) ELEMENT_MT_SAFE(CounterMP)
EXPORT_ELEMENT(CounterRxWMP) EXPORT_ELEMENT(CounterRxWMP)
ELEMENT_MT_SAFE(CounterRxWMP) ELEMENT_MT_SAFE(CounterRxWMP)
EXPORT_ELEMENT(CounterRxWMPPR)
ELEMENT_MT_SAFE(CounterRxWMPPR)
EXPORT_ELEMENT(CounterRxWMPPW)
ELEMENT_MT_SAFE(CounterRxWMPPW)
EXPORT_ELEMENT(CounterLockMP) EXPORT_ELEMENT(CounterLockMP)
ELEMENT_MT_SAFE(CounterLockMP) ELEMENT_MT_SAFE(CounterLockMP)
EXPORT_ELEMENT(CounterPLockMP) EXPORT_ELEMENT(CounterPLockMP)
......
...@@ -252,11 +252,11 @@ protected: ...@@ -252,11 +252,11 @@ protected:
counter_int_type _byte_count; counter_int_type _byte_count;
}; };
template <typename T>
class CounterMPBase : public CounterBase { public:
class CounterMP : public CounterBase { public: CounterMPBase() CLICK_COLD;
~CounterMPBase() CLICK_COLD;
CounterMP() CLICK_COLD;
~CounterMP() CLICK_COLD;
const char *class_name() const { return "CounterMP"; } const char *class_name() const { return "CounterMP"; }
const char *processing() const { return AGNOSTIC; } const char *processing() const { return AGNOSTIC; }
...@@ -265,7 +265,7 @@ class CounterMP : public CounterBase { public: ...@@ -265,7 +265,7 @@ class CounterMP : public CounterBase { public:
void* cast(const char *name) void* cast(const char *name)
{ {
if (strcmp("CounterMP", name) == 0) if (strcmp("CounterMP", name) == 0)
return (CounterMP *)this; return (CounterMPBase<T> *)this;
else else
return CounterBase::cast(name); return CounterBase::cast(name);
} }
...@@ -329,11 +329,30 @@ class CounterMP : public CounterBase { public: ...@@ -329,11 +329,30 @@ class CounterMP : public CounterBase { public:
} }
protected: protected:
rXwlock _atomic_lock CLICK_CACHE_ALIGN; T _atomic_lock CLICK_CACHE_ALIGN;
per_thread<stats> _stats CLICK_CACHE_ALIGN; per_thread<stats> _stats CLICK_CACHE_ALIGN;
}; };
class CounterRxWMP : public CounterMP { public:
class CounterMP : public CounterMPBase<rXwlockPR> { public:
};
template <typename T>
class CounterRxWMPBase : public CounterMPBase<T> { public:
CounterRxWMPBase() CLICK_COLD {
}
~CounterRxWMPBase() CLICK_COLD {
}
const char *processing() const { return Element::AGNOSTIC; }
const char *port_count() const { return Element::PORTS_1_1; }
};
class CounterRxWMP : public CounterRxWMPBase<rXwlock> { public:
CounterRxWMP() CLICK_COLD; CounterRxWMP() CLICK_COLD;
~CounterRxWMP() CLICK_COLD; ~CounterRxWMP() CLICK_COLD;
...@@ -343,6 +362,26 @@ class CounterRxWMP : public CounterMP { public: ...@@ -343,6 +362,26 @@ class CounterRxWMP : public CounterMP { public:
const char *port_count() const { return PORTS_1_1; } const char *port_count() const { return PORTS_1_1; }
}; };
class CounterRxWMPPR : public CounterRxWMPBase<rXwlockPR> { public:
CounterRxWMPPR() CLICK_COLD;
~CounterRxWMPPR() CLICK_COLD;
const char *class_name() const { return "CounterRxWMPPR"; }
const char *processing() const { return AGNOSTIC; }
const char *port_count() const { return PORTS_1_1; }
};
class CounterRxWMPPW : public CounterRxWMPBase<rXwlockPW> { public:
CounterRxWMPPW() CLICK_COLD;
~CounterRxWMPPW() CLICK_COLD;
const char *class_name() const { return "CounterRxWMPPW"; }
const char *processing() const { return AGNOSTIC; }
const char *port_count() const { return PORTS_1_1; }
};
class CounterLockMP : public CounterBase { public: class CounterLockMP : public CounterBase { public:
CounterLockMP() CLICK_COLD; CounterLockMP() CLICK_COLD;
......
...@@ -641,12 +641,12 @@ class __rwlock : public RWLock { public: ...@@ -641,12 +641,12 @@ class __rwlock : public RWLock { public:
* If max_writer is 1, this becomes rwlock, but with a priority on the reads * If max_writer is 1, this becomes rwlock, but with a priority on the reads
*/ */
class rXwlock { public: class rXwlockPR { public:
rXwlock() : max_write(-65535) { rXwlockPR() : max_write(-65535) {
_refcnt = 0; _refcnt = 0;
} }
rXwlock(int32_t max_writers) { rXwlockPR(int32_t max_writers) {
_refcnt = 0; _refcnt = 0;
set_max_writers(max_writers); set_max_writers(max_writers);
} }
...@@ -715,6 +715,140 @@ private: ...@@ -715,6 +715,140 @@ private:
int32_t max_write; int32_t max_write;
} CLICK_CACHE_ALIGN; } CLICK_CACHE_ALIGN;
/**
* Read XOR Write lock. Allow either multiple reader or multiple
* writer. When a reader arrives, writers stop taking the usecount. The reader
* has access once all writer finish.
*
* To stop writer from locking, the reader will CAS a very low value.
*
* If max_writer is 1, this becomes rwlock, but with a priority on the reads
*/
class rXwlockPW { public:
rXwlockPW() : max_write(-65535) {
_refcnt = 0;
}
rXwlockPW(int32_t max_writers) {
_refcnt = 0;
set_max_writers(max_writers);
}
void set_max_writers(int32_t max_writers) {
assert(max_writers < 65535);
write_begin();
max_write = - max_writers;
write_end();
}
inline void write_begin() {
uint32_t current_refcnt;
do {
current_refcnt = _refcnt;
if (unlikely((int32_t)current_refcnt < 0)) {
if ((int32_t)current_refcnt <= -65536) {
//Just wait for the other reader out there to win
} else {
if (_refcnt.compare_swap(current_refcnt,current_refcnt - 65536) == current_refcnt) {
//We could lower the value, so wait for it to reach -65536 (0 writer but one reader waiting) and continue
do {
click_relax_fence();
} while((int32_t)_refcnt != -65536);
//When it is -65536, driver cannot take it and reader are waiting, so we can set it directly
_refcnt = 1;
break;
}
}
} else { // >= 0, just grab another reader (>0)
if (likely(_refcnt.compare_swap(current_refcnt,current_refcnt+1) == current_refcnt))
break;
}
click_relax_fence();
} while (1);
}
inline void write_end() {
click_write_fence();
_refcnt--;
}
inline void write_get() {
_refcnt++;
}
inline void read_begin() {
uint32_t current_refcnt;
do {
current_refcnt = _refcnt;
if (likely((int32_t)current_refcnt <= 0 && (int32_t)current_refcnt > max_write)) {
if (_refcnt.compare_swap(current_refcnt,current_refcnt - 1) == current_refcnt)
break;
}
click_relax_fence();
} while (1);
}
inline void read_end() {
click_read_fence();
_refcnt++;
}
private:
atomic_uint32_t _refcnt;
int32_t max_write;
} CLICK_CACHE_ALIGN;
class rXwlock { public:
rXwlock() {
_refcnt = 0;
}
inline void read_begin() {
uint32_t current_refcnt;
current_refcnt = _refcnt;
while ((int32_t)current_refcnt < 0 || _refcnt.compare_swap(current_refcnt,current_refcnt+1) != current_refcnt) {
click_relax_fence();
current_refcnt = _refcnt;
}
}
void set_max_writers(int32_t max_writers) {
}
inline void read_end() {
click_read_fence();
_refcnt--;
}
inline void read_get() {
_refcnt++;
}
inline void write_begin() {
uint32_t current_refcnt;
current_refcnt = _refcnt;
while ((int32_t)current_refcnt > 0 || _refcnt.compare_swap(current_refcnt,current_refcnt-1) != current_refcnt) {
click_relax_fence();
current_refcnt = _refcnt;
}
}
inline void write_end() {
click_write_fence();
_refcnt++;
}
private:
atomic_uint32_t _refcnt;
} CLICK_CACHE_ALIGN;
/** /**
* Shared-pointer based rwlock * Shared-pointer based rwlock
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment