Commit 8de1ea5f authored by Tom Barbette's avatar Tom Barbette

Support for DPDK and NETMAP as QueueDevice, with support for full push mode

parent ed2762df
......@@ -41,6 +41,12 @@
/* Define if you have the <dlfcn.h> header file. */
#undef HAVE_DLFCN_H
/* Define if a Click user-level driver uses Intel DPDK. */
#undef HAVE_DPDK
/* Define if a Click user-level driver uses Netmap. */
#undef HAVE_NETMAP
/* Define if dynamic linking is possible. */
#undef HAVE_DYNAMIC_LINKING
......@@ -220,6 +226,9 @@
/* Define if you have the vsnprintf function. */
#undef HAVE_VSNPRINTF
/* Define if Zero-copy is enabled. */
#undef HAVE_ZEROCOPY
/* The size of a `click_jiffies_t', as computed by sizeof. */
#define SIZEOF_CLICK_JIFFIES_T SIZEOF_INT
......
......@@ -702,6 +702,7 @@ minios_dir
xen_dir
INCLUDE_KSYMS
LINUXMODULE_FIXINCLUDES
USE_DPDK
PTHREAD_LIBS
AR_CREATEFLAGS
STRIP
......@@ -791,6 +792,7 @@ enable_user_multithread
enable_select
enable_poll
enable_kqueue
enable_dpdk
enable_linuxmodule
enable_fixincludes
enable_multithread
......@@ -816,6 +818,7 @@ enable_ethernet
enable_etherswitch
enable_grid
enable_icmp
enable_zerocopy
enable_ip
enable_ip6
enable_ipsec
......@@ -1483,6 +1486,7 @@ Optional Features:
--disable-select do not use select()
--disable-poll do not use poll()
--disable-kqueue do not use kqueue()
--enable-dpdk use Intel DPDK
--disable-linuxmodule disable Linux kernel driver
--disable-fixincludes do not patch Linux kernel headers for C++
--enable-multithread support kernel multithreading
......@@ -1504,6 +1508,7 @@ Optional Features:
--enable-etherswitch include Ethernet switch elements
--enable-grid include Grid elements (see FAQ)
--disable-icmp do not include ICMP elements
--disable-zerocopy do not use zero copy
--disable-ip do not include IP elements
--enable-ip6 include IPv6 elements
--enable-ipsec include IP security elements
......@@ -6478,6 +6483,46 @@ $as_echo "#define HAVE_ALLOW_KQUEUE 1" >>confdefs.h
fi
# Check whether --enable-dpdk was given.
if test "${enable_dpdk+set}" = set; then :
enableval=$enable_dpdk; :
else
enable_dpdk=no
fi
if test "x$enable_dpdk" = "xyes"; then
if test "x$enable_user_multithread" != "xyes"; then
as_fn_error $? "
=========================================
--enable-dpdk requires --enable-user-multithread which was not provided.
=========================================" "$LINENO" 5
fi
if test ! -f "$RTE_SDK/$RTE_TARGET/include/rte_eal.h"; then
as_fn_error $? "
=========================================
Cannot find \$RTE_SDK/\$RTE_TARGET/include/rte_eal.h for Intel DPDK.
Define \$RTE_SDK and \$RTE_TARGET as per Intel DPDK documentation.
=========================================" "$LINENO" 5
fi
if test ! -f "$RTE_SDK/$RTE_TARGET/lib/librte_eal.a"; then
as_fn_error $? "
=========================================
Cannot find \$RTE_SDK/\$RTE_TARGET/lib/librte_eal.a for Intel DPDK.
Define \$RTE_SDK and \$RTE_TARGET as per Intel DPDK documentation.
=========================================" "$LINENO" 5
fi
$as_echo "#define HAVE_DPDK 1" >>confdefs.h
USE_DPDK=yes
fi
# Check whether --enable-linuxmodule was given.
......@@ -6919,6 +6964,18 @@ test "x$enable_all_elements" = xyes -a \( "x$enable_icmp" = xNO -o "x$enable_icm
if test "x$enable_icmp" = xyes; then
:
fi
# Check whether --enable-zerocopy was given.
if test "${enable_zerocopy+set}" = set; then :
enableval=$enable_zerocopy;
else
enable_zerocopy=yes
fi
test "x$enable_all_elements" = xyes -a \( "x$enable_zerocopy" = xNO -o "x$enable_zerocopy" = x \) && enable_zerocopy=yes
if test "x$enable_zerocopy" = xyes; then
:
$as_echo "#define HAVE_ZEROCOPY 1" >>confdefs.h
fi
# Check whether --enable-ip was given.
if test "${enable_ip+set}" = set; then :
......@@ -13311,6 +13368,10 @@ if test "x$enable_analysis" = xyes; then
provisions="$provisions analysis"
fi
if test "x$enable_dpdk" = xyes; then
provisions="$provisions dpdk"
fi
if test "x$enable_experimental" = xyes; then
provisions="$provisions experimental"
fi
......
......@@ -135,6 +135,40 @@ if echo "$enable_select" | grep kqueue >/dev/null 2>&1 && test "$enable_kqueue"
AC_DEFINE([HAVE_ALLOW_KQUEUE], [1], [Define if kqueue() may be used to wait for file descriptor events.])
fi
AC_ARG_ENABLE([dpdk],
[AS_HELP_STRING([ --enable-dpdk], [use Intel DPDK])],
[:], [enable_dpdk=no])
if test "x$enable_dpdk" = "xyes"; then
if test "x$enable_user_multithread" != "xyes"; then
AC_MSG_ERROR([
=========================================
--enable-dpdk requires --enable-user-multithread which was not provided.
=========================================])
fi
if test ! -f "$RTE_SDK/$RTE_TARGET/include/rte_eal.h"; then
AC_MSG_ERROR([
=========================================
Cannot find \$RTE_SDK/\$RTE_TARGET/include/rte_eal.h for Intel DPDK.
Define \$RTE_SDK and \$RTE_TARGET as per Intel DPDK documentation.
=========================================])
fi
if test ! -f "$RTE_SDK/$RTE_TARGET/lib/librte_eal.a"; then
AC_MSG_ERROR([
=========================================
Cannot find \$RTE_SDK/\$RTE_TARGET/lib/librte_eal.a for Intel DPDK.
Define \$RTE_SDK and \$RTE_TARGET as per Intel DPDK documentation.
=========================================])
fi
AC_DEFINE([HAVE_DPDK])
AC_SUBST(USE_DPDK, yes)
fi
dnl linuxmodule driver and features
......@@ -1956,6 +1990,11 @@ if test "x$enable_analysis" = xyes; then
provisions="$provisions analysis"
fi
dnl add 'dpdk' if compiled with --enable-dpdk
if test "x$enable_dpdk" = xyes; then
provisions="$provisions dpdk"
fi
dnl add 'experimental' if --enable-experimental was supplied
if test "x$enable_experimental" = xyes; then
provisions="$provisions experimental"
......
......@@ -72,6 +72,21 @@ StaticThreadSched::configure(Vector<String> &conf, ErrorHandler *errh)
return 0;
}
Bitvector StaticThreadSched::assigned_thread() {
Bitvector v(nthreads,0);
if (_next_thread_sched) {
v = _next_thread_sched->assigned_thread();
}
for (int i = 0; i < _thread_preferences.size(); i++) {
if (_thread_preferences[i] != THREAD_UNKNOWN) {
if (v.size() <= _thread_preferences[i])
v.resize(_thread_preferences[i]+1);
v[_thread_preferences[i]] = i;
}
}
return v;
}
int
StaticThreadSched::initial_home_thread_id(const Element *e)
{
......
......@@ -29,6 +29,8 @@ class StaticThreadSched : public Element, public ThreadSched { public:
int initial_home_thread_id(const Element *e);
Bitvector assigned_thread();
private:
Vector<int> _thread_preferences;
ThreadSched *_next_thread_sched;
......
/*
* fromdpdkdevice.{cc,hh} -- element reads packets live from network via
* Intel's DPDK
*
* Copyright (c) 2014-2015 University of Liège
* Copyright (c) 2014 Cyril Soldani
* Copyright (c) 2015 Tom Barbette
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, subject to the conditions
* listed in the Click LICENSE file. These conditions include: you must
* preserve this copyright notice, and you cannot mention the copyright
* holders in advertising related to the Software without their permission.
* The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
* notice is a summary of the Click LICENSE file; the license in that file is
* legally binding.
*/
#include <click/config.h>
#include <click/args.hh>
#include <click/error.hh>
#include <rte_ethdev.h>
#include <rte_mbuf.h>
#include "fromdpdkdevice.hh"
CLICK_DECLS
FromDpdkDevice::FromDpdkDevice()
: _port_no(0), _promisc(true), _burst(32)
{
}
FromDpdkDevice::~FromDpdkDevice()
{
}
int FromDpdkDevice::configure(Vector<String> &conf, ErrorHandler *errh)
{
//Default parameters
int maxthreads = -1;
int threadoffset = -1;
int minqueues = 1;
int maxqueues = 128; //TODO Should be device dependent
if (Args(conf, this, errh)
.read_mp("DEVNAME", _port_no)
.read_p("PROMISC", _promisc)
.read_p("BURST", _burst)
.read_p("MAXTHREADS", maxthreads)
.read_p("THREADOFFSET", threadoffset)
.read("MINQUEUES",minqueues)
.read("MAXQUEUES",maxqueues)
.read("NDESC",ndesc)
.complete() < 0)
return -1;
int numa_node = DpdkDevice::get_port_numa_node(_port_no);
int r;
r = QueueDevice::configure_rx(numa_node,maxthreads,minqueues,maxqueues,threadoffset,errh);
if (r != 0) return r;
return 0;
}
int FromDpdkDevice::initialize(ErrorHandler *errh)
{
int ret;
ret = QueueDevice::initialize_rx(errh);
if (ret != 0) return ret;
for (int i = 0; i < nqueues; i++) {
ret = DpdkDevice::add_rx_device(_port_no, i , _promisc, errh);
if (ret != 0) return ret;
}
if (ndesc > 0)
DpdkDevice::set_rx_descs(_port_no, ndesc, errh);
ret = QueueDevice::initialize_tasks(true,errh);
if (ret != 0) return ret;
if (all_initialized()) {
ret = DpdkDevice::initialize(errh);
if (ret != 0) return ret;
}
return ret;
}
void FromDpdkDevice::add_handlers()
{
add_read_handler("count", count_handler, 0);
add_write_handler("reset_counts", reset_count_handler, 0, Handler::BUTTON);
}
bool FromDpdkDevice::run_task(Task * t)
{
struct rte_mbuf *pkts[_burst];
int ret = 0;
for (int iqueue = queue_for_thread_begin(); iqueue<=queue_for_thread_end();iqueue++) {
unsigned n = rte_eth_rx_burst(_port_no, iqueue, pkts, _burst);
for (unsigned i = 0; i < n; ++i) {
WritablePacket *p = Packet::make((void*)rte_pktmbuf_mtod(pkts[i], unsigned char *),
(uint32_t)rte_pktmbuf_pkt_len(pkts[i]));
rte_pktmbuf_free(pkts[i]);
p->set_packet_type_anno(HOST);
output(0).push(p);
}
if (n) {
add_count(n);
ret = 1;
}
}
/*We reschedule directly, as we cannot know if there is actually packet
* available and dpdk has no select mechanism*/
t->fast_reschedule();
return (ret);
}
CLICK_ENDDECLS
ELEMENT_REQUIRES(userlevel dpdk)
EXPORT_ELEMENT(FromDpdkDevice)
ELEMENT_MT_SAFE(FromDpdkDevice)
#ifndef CLICK_FROMDPDKDEVICE_HH
#define CLICK_FROMDPDKDEVICE_HH
#include <click/batchelement.hh>
#include <click/notifier.hh>
#include <click/task.hh>
#include <click/dpdkdevice.hh>
#include "kernelfilter.hh"
#include "queuedevice.hh"
CLICK_DECLS
/*
* =title FromDpdkDevice
*
* =c
*
* FromDpdkDevice(DEVNAME [, PROMISC])
*
* =s netdevices
*
* reads packets from network device using Intel's DPDK (user-level)
*
* =d
*
* Reads packets from the network controller named DEVNAME, using Intel's DPDK.
*
* On the contrary to FromDevice.u which acts as a sniffer by default, packets
* received by devices put in DPDK mode will NOT be received by the kernel, and
* will thus be processed only once.
*
* Keyword arguments are:
*
* =over 8
*
* =item DEVNAME
*
* String. Device number
* *
* =item PROMISC
*
* Boolean. FromDpdkDevice puts the device in promiscuous mode if PROMISC is
* true. The default is false.
*
* =item BURST
*
* Maximal number of packets that will be processed before rescheduling Click
* default is 32.
*
* =item MAXTHREADS
*
* Maximal number of threads that this element will take to read packets from
* the input queue. If unset (or negative) all threads not pinned with a
* ThreadScheduler element will be shared among FromDPDKDevice elements and
* other input elements supporting multiqueue (extending QueueDevice)
*
* =item THREADOFFSET
*
* Specify which Click thread will handle this element. If multiple
* j threads are used, threads with id THREADOFFSET+j will be used. Default is
* to share the threads available on the device's NUMA node equally.
*
* =item MINQUEUE
*
* Minimum number of hardware queue of the devices to use. Multiple queues
* allows to load balance the traffic on multiple thread using RSS.
* Default is 1.
*
* =item MAXQUEUES
*
* Maximum number of hardware queue to use. Default is 128.
*
* =item NDESC
*
* Number of descriptor per ring. If unset (or negative), default amount will be
* used.
*
* =back
*
* This element is only available at user level, when compiled with DPDK
* support.
*
* =e
*
* FromDpdkDevice() ->
*
* =n
*
* FromDpdkDevice sets packets' extra length annotations as appropriate.
*
* =h count read-only
*
* Returns the number of packets read by the device.
*
* =h reset_counts write-only
*
* Resets "count" to zero.
*
* =a ToDpdkDevice
*/
class FromDpdkDevice : public QueueDevice {
public:
FromDpdkDevice() CLICK_COLD;
~FromDpdkDevice() CLICK_COLD;
const char *class_name() const { return "FromDpdkDevice"; }
const char *port_count() const { return PORTS_0_1; }
const char *processing() const { return PUSH; }
int configure_phase() const {
return KernelFilter::CONFIGURE_PHASE_FROMDEVICE;
}
bool can_live_reconfigure() const { return false; }
int configure(Vector<String> &, ErrorHandler *) CLICK_COLD;
int initialize(ErrorHandler *) CLICK_COLD;
void add_handlers() CLICK_COLD;
bool run_task(Task *);
private:
static String read_handler(Element*, void*) CLICK_COLD;
static int write_handler(const String&, Element*, void*, ErrorHandler*)
CLICK_COLD;
unsigned _port_no;
bool _promisc;
unsigned int _burst;
};
CLICK_ENDDECLS
#endif // CLICK_FROMDPDKDEVICE_HH
#ifndef CLICK_NETMAPINFO_HH
#define CLICK_NETMAPINFO_HH 1
#include <click/netmapdevice.hh>
#if HAVE_NET_NETMAP_H
#include <net/if.h>
#include <net/netmap.h>
#include <net/netmap_user.h>
// XXX bug in netmap_user.h , the prototype should be available
#ifndef NETMAP_WITH_LIBS
......@@ -16,74 +19,6 @@ typedef void (*nm_cb_t)(u_char *, const struct nm_pkthdr *, const u_char *d);
#include <click/error.hh>
CLICK_DECLS
/* a queue of netmap buffers, by index */
class NetmapBufQ {
unsigned char *buf_start; /* base address */
unsigned int buf_size;
unsigned int max_index; /* error checking */
unsigned char *buf_end; /* error checking */
unsigned int head; /* index of first buffer */
unsigned int tail; /* index of last buffer */
unsigned int count; /* how many ? */
public:
inline unsigned int insert(unsigned int idx) {
if (idx >= max_index) {
return 1; // error
}
unsigned int *p = reinterpret_cast<unsigned int *>(buf_start +
idx * buf_size);
// prepend
*p = head;
if (head == 0) {
tail = idx;
}
head = idx;
count++;
return 0;
}
inline unsigned int insert_p(unsigned char *p) {
if (p < buf_start || p >= buf_end)
return 1;
return insert((p - buf_start) / buf_size);
}
inline unsigned int extract() {
if (count == 0)
return 0;
unsigned int idx = head;
unsigned int *p = reinterpret_cast<unsigned int *>(buf_start +
idx * buf_size);
head = *p;
count--;
return idx;
}
inline unsigned char * extract_p() {
unsigned int idx = extract();
return (idx == 0) ? 0 : buf_start + idx * buf_size;
}
inline int init (void *beg, void *end, uint32_t _size) {
click_chatter("Initializing NetmapBufQ %p size %d mem %p %p\n",
this, _size, beg, end);
head = tail = max_index = 0;
count = 0;
buf_size = 0;
buf_start = buf_end = 0;
if (_size == 0 || _size > 0x10000 ||
beg == 0 || end == 0 || end < beg) {
click_chatter("NetmapBufQ %p bad args: size %d mem %p %p\n",
this, _size, beg, end);
return 1;
}
buf_size = _size;
buf_start = reinterpret_cast<unsigned char *>(beg);
buf_end = reinterpret_cast<unsigned char *>(end);
max_index = (buf_end - buf_start) / buf_size;
// check max_index overflow ?
return 0;
}
};
/* a netmap port as returned by nm_open */
class NetmapInfo { public:
......
// -*- c-basic-offset: 4; related-file-name: "queuedevice.hh" -*-
/*
* queuedevice.{cc,hh} -- Base element for multiqueue/multichannel device
*
* Copyright (c) 2014 Tom Barbette, University of Liège
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, subject to the conditions
* listed in the Click LICENSE file. These conditions include: you must
* preserve this copyright notice, and you cannot mention the copyright
* holders in advertising related to the Software without their permission.
* The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
* notice is a summary of the Click LICENSE file; the license in that file is
* legally binding.
*/
#include <click/config.h>
#include "queuedevice.hh"
CLICK_DECLS
int QueueDevice::n_initialized = 0;
int QueueDevice::n_elements = 0;
int QueueDevice::n_inputs = 0;
int QueueDevice::use_nodes = 0;
Vector<int> QueueDevice::inputs_count = Vector<int>();
Vector<int> QueueDevice::shared_offset = Vector<int>();
void QueueDevice::static_initialize() {
int num_nodes = 1;
shared_offset.resize(num_nodes);
inputs_count.resize(num_nodes);
inputs_count.fill(0);
shared_offset.fill(0);
}
int QueueDevice::configure_rx(int numa_node,unsigned int maxthreads, unsigned int minqueues, unsigned int maxqueues, unsigned int threadoffset, ErrorHandler *) {
_maxthreads = maxthreads;
_minqueues = minqueues;
_maxqueues = maxqueues;
_threadoffset = threadoffset;
usable_threads.assign(master()->nthreads(),false);
if (numa_node >= 0)
_this_node = numa_node;
else
_this_node = 0;
if (_maxthreads == -1 || _threadoffset == -1) {
inputs_count[_this_node] ++;
if (inputs_count[_this_node] == 1)
use_nodes++;
}
n_elements ++;
return 0;
}
int QueueDevice::configure_tx(unsigned int maxthreads,unsigned int minqueues, unsigned int maxqueues, ErrorHandler *) {
usable_threads.assign(master()->nthreads(),true);
_maxthreads = maxthreads;
_minqueues = minqueues;
_maxqueues = maxqueues;
n_elements ++;
return 0;
}
int QueueDevice::initialize_tx(ErrorHandler *) {
int n_threads;
if (_maxthreads == -1)
n_threads = usable_threads.weight();
else
n_threads = min(_maxthreads,usable_threads.weight());
nqueues = min(_maxqueues,n_threads);
nqueues = max(_minqueues,nqueues);
queue_per_threads = nqueues / n_threads;
if (queue_per_threads == 0) {
queue_per_threads = 1;
thread_share = n_threads / nqueues;
}
n_initialized++;
click_chatter("%s : OUTPUT Using %d threads. %d queues so %d queues for %d thread",name().c_str(),n_threads,nqueues,queue_per_threads,thread_share);
return 0;
}
int QueueDevice::initialize_rx(ErrorHandler *errh) {
usable_threads.negate();
for (int i = nthreads; i < usable_threads.size(); i++)
usable_threads[i] = 0;
if (router()->thread_sched()) {
Bitvector v = router()->thread_sched()->assigned_thread();
if (v.size() < usable_threads.size())
v.resize(usable_threads.size());
if (v.weight() == usable_threads.weight())
click_chatter("Warning : input thread assignment will assign threads already assigned by yourself, as you didn't left any cores for %s",name().c_str());
else
usable_threads &= (~v);
}
int cores_in_node = usable_threads.weight();
int n_threads;
if (_maxthreads == -1) {
n_threads = min(cores_in_node,master()->nthreads() / use_nodes) / inputs_count[_this_node];
} else {
n_threads = min(cores_in_node,_maxthreads);
}
if (n_threads == 0) {
n_threads = 1;
thread_share = inputs_count[_this_node] / min(cores_in_node,master()->nthreads() / use_nodes);
}
if (n_threads > _maxqueues) {
queue_share = n_threads / _maxqueues;
}
if (_threadoffset == -1) {
_threadoffset = shared_offset[_this_node];
shared_offset[_this_node] += n_threads;
}
if (thread_share > 1)
_threadoffset = _threadoffset % (inputs_count[_this_node] / thread_share);
else
if (n_threads + _threadoffset > master()->nthreads())
_threadoffset = master()->nthreads() - n_threads;
nqueues = min(_maxqueues,n_threads);
nqueues = max(_minqueues,nqueues);
queue_per_threads = nqueues / n_threads;
if (queue_per_threads * n_threads < nqueues) queue_per_threads ++;
click_chatter("%s : INPUT Using thread %d to %d. %d queues per thread",name().c_str(),_threadoffset,_threadoffset + n_threads - 1,queue_per_threads);
int count = 0;
int offset = 0;