Commit 148d26aa authored by Tom Barbette's avatar Tom Barbette

Add PRWMP vs RWMP

parent ff11fe2f
Pipeline #1844 passed with stage
in 12 minutes and 4 seconds
......@@ -402,6 +402,87 @@ CounterRxWMP::CounterRxWMP()
CounterRxWMP::~CounterRxWMP()
{
}
CounterRWMP::CounterRWMP()
{
_atomic = 1;
}
CounterRWMP::~CounterRWMP()
{
}
CounterPRWMP::CounterPRWMP()
{
_atomic = 2;
}
CounterPRWMP::~CounterPRWMP()
{
}
int
CounterRWMP::initialize(ErrorHandler *errh) {
if (CounterBase::initialize(errh) != 0)
return -1;
//If not in simple mode, we only allow one writer so we can sum up the total number of threads
return 0;
}
Packet*
CounterRWMP::simple_action(Packet *p)
{
_stats->lock.acquire();
_stats->s._count++;
_stats->s._byte_count += p->length();
_stats->lock.release();
if (unlikely(!_simple))
check_handlers(CounterRWMP::count(), CounterRWMP::byte_count()); //BUG : if not atomic, then handler may be called twice
return p;
}
#if HAVE_BATCH
PacketBatch*
CounterRWMP::simple_action_batch(PacketBatch *batch)
{
if (unlikely(_batch_precise)) {
FOR_EACH_PACKET(batch, p)
CounterRWMP::simple_action(p);
return batch;
}
counter_int_type bc = 0;
FOR_EACH_PACKET(batch,p) {
bc += p->length();
}
_stats->lock.acquire();
_stats->s._count += batch->count();
_stats->s._byte_count += bc;
_stats->lock.release();
if (unlikely(!_simple))
check_handlers(CounterRWMP::count(), CounterRWMP::byte_count());
return batch;
}
#endif
void
CounterRWMP::reset()
{
acquire();
for (unsigned i = 0; i < _stats.weight(); i++) { \
_stats.get_value(i).s._count = 0;
_stats.get_value(i).s._byte_count = 0;
}
release();
CounterBase::reset();
}
/*
CounterRCUMP::CounterRCUMP() : _stats()
{
......@@ -753,6 +834,11 @@ EXPORT_ELEMENT(CounterMP)
ELEMENT_MT_SAFE(CounterMP)
EXPORT_ELEMENT(CounterRxWMP)
ELEMENT_MT_SAFE(CounterRxWMP)
EXPORT_ELEMENT(CounterRWMP)
ELEMENT_MT_SAFE(CounterRWMP)
EXPORT_ELEMENT(CounterPRWMP)
ELEMENT_MT_SAFE(CounterPRWMP)
EXPORT_ELEMENT(CounterRW)
ELEMENT_MT_SAFE(CounterRW)
EXPORT_ELEMENT(CounterPRW)
......
......@@ -342,6 +342,120 @@ class CounterRxWMP : public CounterMP { public:
const char *processing() const { return AGNOSTIC; }
const char *port_count() const { return PORTS_1_1; }
};
class CounterRWMP : public CounterBase { public:
CounterRWMP() CLICK_COLD;
~CounterRWMP() CLICK_COLD;
const char *class_name() const { return "CounterRWMP"; }
const char *processing() const { return AGNOSTIC; }
const char *port_count() const { return PORTS_1_1; }
int initialize(ErrorHandler *) CLICK_COLD;
int can_atomic() { return 2; } CLICK_COLD;
Packet *simple_action(Packet *);
#if HAVE_BATCH
PacketBatch *simple_action_batch(PacketBatch* batch);
#endif
void reset();
counter_int_type count() override {
PER_THREAD_MEMBER_SUM(counter_int_type,sum,_stats,s._count);
return sum;
}
counter_int_type byte_count() override {
PER_THREAD_MEMBER_SUM(counter_int_type,sum,_stats,s._byte_count);
return sum;
}
stats read() {
counter_int_type count = 0;
counter_int_type byte_count = 0;
for (unsigned i = 0; i < _stats.weight(); i++) { \
count += _stats.get_value(i).s._count;
byte_count += _stats.get_value(i).s._byte_count;
}
return {count,byte_count};
}
inline void acquire() {
loop:
for (unsigned i = 0; i < _stats.weight(); i++) {
if (!_stats.get_value(i).lock.attempt()) {
for (unsigned j = 0; j < i; j++) {
_stats.get_value(j).lock.release();
}
goto loop;
}
}
}
inline void release() {
for (unsigned i = 0; i < _stats.weight(); i++)
_stats.get_value(i).lock.release();
}
stats atomic_read() {
counter_int_type count = 0;
counter_int_type byte_count = 0;
if (_atomic == 2) {
acquire();
for (unsigned i = 0; i < _stats.weight(); i++) {
count += _stats.get_value(i).s._count;
byte_count += _stats.get_value(i).s._byte_count;
}
release();
} else {
for (unsigned i = 0; i < _stats.weight(); i++) {
_stats.get_value(i).lock.acquire();
count += _stats.get_value(i).s._count;
byte_count += _stats.get_value(i).s._byte_count;
_stats.get_value(i).lock.release();
}
}
return {count,byte_count};
}
void add(stats s) override {
_stats->s._count += s._count;
_stats->s._byte_count += s._byte_count;
}
void atomic_add(stats s) override {
_stats->lock.acquire();
_stats->s._count += s._count;
_stats->s._byte_count += s._byte_count;
_stats->lock.release();
}
protected:
class bucket { public:
bucket() : s(), lock() {
}
stats s;
Spinlock lock;
};
per_thread<bucket> _stats CLICK_CACHE_ALIGN;
};
class CounterPRWMP : public CounterRWMP { public:
CounterPRWMP() CLICK_COLD;
~CounterPRWMP() CLICK_COLD;
const char *class_name() const { return "CounterPRWMP"; }
const char *processing() const { return AGNOSTIC; }
const char *port_count() const { return PORTS_1_1; }
};
/*
class CounterRCUMP : public CounterBase { public:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment