Commit b65eddfc authored by Tom Barbette's avatar Tom Barbette

Merge support for resize in TCPRetransmitter

parent 4da2496b
Pipeline #2096 failed with stage
in 11 minutes and 26 seconds
...@@ -55,7 +55,7 @@ int SimpleTCPRetransmitter::initialize(ErrorHandler *errh) { ...@@ -55,7 +55,7 @@ int SimpleTCPRetransmitter::initialize(ErrorHandler *errh) {
if (StackStateElement<SimpleTCPRetransmitter,fcb_transmit_buffer>::initialize(errh) != 0) if (StackStateElement<SimpleTCPRetransmitter,fcb_transmit_buffer>::initialize(errh) != 0)
return -1; return -1;
if (_in->getOutElement()->maxModificationLevel() & MODIFICATION_RESIZE) { if (_in->getOutElement()->maxModificationLevel() & MODIFICATION_RESIZE) {
return errh->error("SimpleTCPRetransmitter does not work when resizing the flow ! Use the non simple one."); _resize = true;
} }
return 0; return 0;
} }
...@@ -70,6 +70,38 @@ SimpleTCPRetransmitter::release_flow(fcb_transmit_buffer* fcb) { ...@@ -70,6 +70,38 @@ SimpleTCPRetransmitter::release_flow(fcb_transmit_buffer* fcb) {
return; return;
} }
void
SimpleTCPRetransmitter::forward_packets(fcb_transmit_buffer* fcb, PacketBatch* batch) {
/**
* Just prune the buffer and put the packet with payload in the list (don't buffer ACKs)
*/
prune(fcb);
FOR_EACH_PACKET_SAFE(batch,packet) {
if (getPayloadLength(packet) == 0)
continue;
Packet* clone = packet->clone(true); //Fast clone. If using DPDK, we only hold a buffer reference
assert(clone->buffer() == packet->buffer());
/*click_chatter("%p %p %p",clone->mac_header(),packet->mac_header());
clone->set_mac_header(packet->mac_header());
clone->set_network_header(packet->network_header());
clone->set_transport_header(packet->transport_header());*/
//Actually add the packet in the FCB
if (fcb->first_unacked) {
fcb->first_unacked->append_packet(clone);
clone->set_next(0);
} else {
fcb->first_unacked = PacketBatch::make_from_packet(clone);
}
}
//Send the original batch
if(batch != NULL)
output_push_batch(0, batch);
return;
}
void void
SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBatch *batch) SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBatch *batch)
{ {
...@@ -80,37 +112,12 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat ...@@ -80,37 +112,12 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat
} }
if(port == 0) { //Normal packet to buffer if(port == 0) { //Normal packet to buffer
/** forward_packets(fcb, batch);
* Just prune the buffer and put the packet with payload in the list (don't buffer ACKs)
*/
prune(fcb);
FOR_EACH_PACKET_SAFE(batch,packet) {
if (getPayloadLength(packet) == 0)
continue;
Packet* clone = packet->clone(true); //Fast clone. If using DPDK, we only hold a buffer reference
clone->set_mac_header(packet->mac_header());
clone->set_network_header(packet->network_header());
clone->set_transport_header(packet->transport_header());
//Actually add the packet in the FCB
if (fcb->first_unacked) {
fcb->first_unacked->append_packet(clone);
clone->set_next(0);
} else {
fcb->first_unacked = PacketBatch::make_from_packet(clone);
}
}
//Send the original batch
if(batch != NULL)
output_push_batch(0, batch);
return;
} else { /* port == 1 */ //Retransmission } else { /* port == 1 */ //Retransmission
auto fcb_in = _in->fcb_data(); //Scratchpad for TCPIn auto fcb_in = _in->fcb_data(); //Scratchpad for TCPIn
//Retransmission of a SYN -> Let it go through //Retransmission of a SYN -> Let it go through
if (fcb_in->common->state == TCPState::ESTABLISHING && isSyn(batch->first()) || isRst(batch->first())) { if ((fcb_in->common->state < TCPState::OPEN) && isSyn(batch->first()) || isRst(batch->first())) {
if (_verbose) if (_verbose)
click_chatter("Unestablished connection, letting the rt packet go through"); click_chatter("Unestablished connection, letting the rt packet go through");
if (batch->count() > 1) { if (batch->count() > 1) {
...@@ -136,12 +143,22 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat ...@@ -136,12 +143,22 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat
prune(fcb); prune(fcb);
Packet* lastretransmit = 0; Packet* lastretransmit = 0;
unsigned int flowDirection = determineFlowDirection();
ByteStreamMaintainer &maintainer = fcb_in->common->maintainers[flowDirection];
FOR_EACH_PACKET_SAFE(batch, packet) { FOR_EACH_PACKET_SAFE(batch, packet) {
uint32_t seq = getSequenceNumber(packet); uint32_t seq = getSequenceNumber(packet);
if (_proack && fcb_in->common->lastAckReceivedSet() && SEQ_LT(seq, fcb_in->common->getLastAckReceived(_in->getOppositeFlowDirection()))) { uint32_t mappedSeq;
if (_resize)
mappedSeq = maintainer.mapSeq(seq);
else
mappedSeq = seq;
if (_proack && fcb_in->common->lastAckReceivedSet() && SEQ_LT(mappedSeq, fcb_in->common->getLastAckReceived(_in->getOppositeFlowDirection()))) {
if (_verbose) if (_verbose)
click_chatter("Client just did not receive the ack, let's ACK him (seq %lu, last ack %lu, state %d)",seq,fcb_in->common->getLastAckReceived(_in->getOppositeFlowDirection()),fcb_in->common->state); click_chatter("Client just did not receive the ack, let's ACK him (seq %lu, last ack %lu, state %d)",seq,fcb_in->common->getLastAckReceived(_in->getOppositeFlowDirection()),fcb_in->common->state);
_in->ackPacket(packet,true); _in->ackPacket(packet,true);
continue; continue;
} }
if (getPayloadLength(packet) == 0) { if (getPayloadLength(packet) == 0) {
...@@ -156,7 +173,7 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat ...@@ -156,7 +173,7 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat
} else { } else {
//Seq is bigger than last ack, (the original of) this packet was lost before the dest reached it (or we never received the ack) //Seq is bigger than last ack, (the original of) this packet was lost before the dest reached it (or we never received the ack)
FOR_EACH_PACKET_SAFE(fcb->first_unacked, pr) { FOR_EACH_PACKET_SAFE(fcb->first_unacked, pr) {
if (getSequenceNumber(pr) == seq) { if (getSequenceNumber(pr) == mappedSeq) {
if (lastretransmit == pr) { //Avoid double retransmission if (lastretransmit == pr) { //Avoid double retransmission
//TODO : do we want to do that? //TODO : do we want to do that?
if (_verbose) if (_verbose)
...@@ -166,7 +183,7 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat ...@@ -166,7 +183,7 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat
if (unlikely(_verbose)) if (unlikely(_verbose))
click_chatter("Retransmitting one packet from the buffer (seq %lu)",getSequenceNumber(pr)); click_chatter("Retransmitting one packet from the buffer (seq %lu)",getSequenceNumber(pr));
fcb_acquire(1); fcb_acquire(1);
output_push_batch(0,PacketBatch::make_from_packet(packet->clone(true))); output_push_batch(0,PacketBatch::make_from_packet(pr->clone(true)));
} }
goto found; goto found;
} }
...@@ -183,9 +200,6 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat ...@@ -183,9 +200,6 @@ SimpleTCPRetransmitter::push_batch(int port, fcb_transmit_buffer* fcb, PacketBat
} }
} }
void SimpleTCPRetransmitter::prune(fcb_transmit_buffer* fcb) void SimpleTCPRetransmitter::prune(fcb_transmit_buffer* fcb)
{ {
if (!_in->fcb_data()->common->lastAckReceivedSet()) if (!_in->fcb_data()->common->lastAckReceivedSet())
......
...@@ -66,6 +66,7 @@ public: ...@@ -66,6 +66,7 @@ public:
static const int timeout = 0; //Timeout will be managed by TCP static const int timeout = 0; //Timeout will be managed by TCP
void forward_packets(fcb_transmit_buffer* fcb, PacketBatch* batch);
void release_flow(fcb_transmit_buffer*); void release_flow(fcb_transmit_buffer*);
...@@ -74,7 +75,7 @@ public: ...@@ -74,7 +75,7 @@ public:
return; return;
StackElement::addStackElementInList(element,port); StackElement::addStackElementInList(element,port);
} }
private: protected:
/** /**
* @brief Prune the buffer. Used when data are ACKed by the destination * @brief Prune the buffer. Used when data are ACKed by the destination
...@@ -94,6 +95,7 @@ private: ...@@ -94,6 +95,7 @@ private:
bool _verbose; bool _verbose;
bool _proack; bool _proack;
TCPIn* _in; TCPIn* _in;
bool _resize;
}; };
CLICK_ENDDECLS CLICK_ENDDECLS
......
...@@ -198,7 +198,6 @@ void StackElement::removeBytes(WritablePacket* packet, uint32_t position, ...@@ -198,7 +198,6 @@ void StackElement::removeBytes(WritablePacket* packet, uint32_t position,
position += packet->getContentOffset(); position += packet->getContentOffset();
uint32_t bytesAfter = packet->length() - position; uint32_t bytesAfter = packet->length() - position;
if (bytesAfter > 0) { if (bytesAfter > 0) {
//click_chatter("Bytes after !");
memmove(&source[position], &source[position + length], bytesAfter); memmove(&source[position], &source[position + length], bytesAfter);
} }
packet->take(length); packet->take(length);
......
...@@ -15,10 +15,6 @@ CLICK_DECLS ...@@ -15,10 +15,6 @@ CLICK_DECLS
WordMatcher::WordMatcher() : insults() WordMatcher::WordMatcher() : insults()
{ {
// Initialize the memory pool of each thread
for(unsigned int i = 0; i < poolBufferEntries.weight(); ++i)
poolBufferEntries.get_value(i).initialize(POOL_BUFFER_ENTRIES_SIZE);
closeAfterInsults = false; closeAfterInsults = false;
_mask = true; _mask = true;
_insert = false; _insert = false;
...@@ -91,8 +87,9 @@ void WordMatcher::push_batch(int port, fcb_WordMatcher* WordMatcher, PacketBatch ...@@ -91,8 +87,9 @@ void WordMatcher::push_batch(int port, fcb_WordMatcher* WordMatcher, PacketBatch
if (!iter.current()) { if (!iter.current()) {
goto finished; goto finished;
} }
/** /**
* This is mostly an example element, so we have two modes : * This is mostly an example element. We have two modes for demo:
* - Replacement, done inline using the iterator directly in this element * - Replacement, done inline using the iterator directly in this element
* - Removing, done calling iterator.remove * - Removing, done calling iterator.remove
*/ */
...@@ -137,19 +134,18 @@ void WordMatcher::push_batch(int port, fcb_WordMatcher* WordMatcher, PacketBatch ...@@ -137,19 +134,18 @@ void WordMatcher::push_batch(int port, fcb_WordMatcher* WordMatcher, PacketBatch
} }
} else { } else {
int result; int result;
FlowBufferContentIter iter;
do { do {
//iter = WordMatcher->flowBuffer.search(iter, insult, &result); //iter = WordMatcher->flowBuffer.search(iter, insult, &result);
iter = WordMatcher->flowBuffer.searchSSE(iter, insult, insults[i].length(), &result); iter = WordMatcher->flowBuffer.searchSSE(iter, insult, insults[i].length(), &result);
//click_chatter("Found %d at %d,",result,iter.current()?iter.leftInChunk():-1);
if (result == 1) { if (result == 1) {
if (!_insert) { //If not insert, just remove if (!_insert) { //If not insert, just remove
WordMatcher->flowBuffer.remove(iter,insults[i].length(), this); WordMatcher->flowBuffer.remove(iter,insults[i].length(), this);
while (iter.leftInChunk() == 0 && iter)
iter.moveToNextChunk();
} else if (!_full){ //Insert but not full, replace pattern per message } else if (!_full){ //Insert but not full, replace pattern per message
WordMatcher->flowBuffer.replaceInFlow(iter, insults[i].length(), _insert_msg.c_str(), _insert_msg.length(), this); WordMatcher->flowBuffer.replaceInFlow(iter, insults[i].length(), _insert_msg.c_str(), _insert_msg.length(), this);
} }
}
if (result == 1) {
if (closeAfterInsults) if (closeAfterInsults)
goto closeconn; goto closeconn;
if (_full) { if (_full) {
...@@ -159,8 +155,11 @@ void WordMatcher::push_batch(int port, fcb_WordMatcher* WordMatcher, PacketBatch ...@@ -159,8 +155,11 @@ void WordMatcher::push_batch(int port, fcb_WordMatcher* WordMatcher, PacketBatch
} }
WordMatcher->counterRemoved += 1; WordMatcher->counterRemoved += 1;
} }
} while (_all && result == 1);
if (!_all)
break;
} while (result == 1 && iter.leftInChunk());
// While we keep finding complete insults in the packet // While we keep finding complete insults in the packet
if (result == 0) { //Finished in the middle of a potential match if (result == 0) { //Finished in the middle of a potential match
if(!isLastUsefulPacket(flow->tail())) { if(!isLastUsefulPacket(flow->tail())) {
......
...@@ -70,16 +70,6 @@ protected: ...@@ -70,16 +70,6 @@ protected:
virtual int maxModificationLevel() override; virtual int maxModificationLevel() override;
/** @brief Remove an insult in the web page stored in the buffer
* @param fcb Pointer to the FCB of the flow
* @param insult The insult to remove
* @return The result of the deletion (1: insult found and removed, -1 insult not found, 0
* insult not found but may start at the end of the last packet in the buffer)
*/
int removeInsult(struct fcb_WordMatcher* fcb, const char *insult);
per_thread<MemoryPool<struct flowBufferEntry>> poolBufferEntries;
Vector<String> insults; // Vector containing the words to remove from the web pages Vector<String> insults; // Vector containing the words to remove from the web pages
bool closeAfterInsults; bool closeAfterInsults;
......
...@@ -157,7 +157,7 @@ public: ...@@ -157,7 +157,7 @@ public:
* @param start A content iterator pointing to the first byte to remove * @param start A content iterator pointing to the first byte to remove
* @param length The number of bytes to remove * @param length The number of bytes to remove
*/ */
void remove(FlowBufferContentIter start, uint32_t length, StackElement* owner); void remove(const FlowBufferContentIter start, uint32_t length, StackElement* owner);
private: private:
...@@ -274,15 +274,29 @@ public: ...@@ -274,15 +274,29 @@ public:
return entry->next() == 0; return entry->next() == 0;
} }
inline bool moveToNextChunk() { inline void moveToNextChunk() {
entry = entry->next(); entry = entry->next();
offsetInPacket = 0; offsetInPacket = 0;
} }
inline int leftInChunk() { inline int leftInChunk() {
if (!entry)
return 0;
return entry->length() - (entry->getContentOffset() + offsetInPacket); return entry->length() - (entry->getContentOffset() + offsetInPacket);
} }
void print_ascii() {
FlowBufferContentIter n = *this;
while (n) {
char buf[n.leftInChunk() + 1];
memcpy(buf, n.get_ptr(), n.leftInChunk());
buf[n.leftInChunk()] = '\0';
click_chatter("'%s'",buf);
n.moveToNextChunk();
}
}
private: private:
/** @brief Repair a FlowBufferContentIter. After a deletion at the end of a packet, /** @brief Repair a FlowBufferContentIter. After a deletion at the end of a packet,
* the iterator may point after the new content of the packet and thus have an invalid position. * the iterator may point after the new content of the packet and thus have an invalid position.
......
...@@ -412,8 +412,8 @@ ByteStreamMaintainer::~ByteStreamMaintainer() ...@@ -412,8 +412,8 @@ ByteStreamMaintainer::~ByteStreamMaintainer()
{ {
if(initialized) if(initialized)
{ {
RBTreeDestroy(treeAck); //RBTreeDestroy(treeAck); TODO
RBTreeDestroy(treeSeq); //RBTreeDestroy(treeSeq); TODO
} }
} }
......
...@@ -177,22 +177,25 @@ FlowBufferIter FlowBuffer::end() ...@@ -177,22 +177,25 @@ FlowBufferIter FlowBuffer::end()
FlowBufferContentIter FlowBuffer::searchSSE(FlowBufferContentIter start, const char* needle, const int pattern_length, FlowBufferContentIter FlowBuffer::searchSSE(FlowBufferContentIter start, const char* needle, const int pattern_length,
int *feedback) { int *feedback) {
int n = start.leftInChunk();
//If the first chunk is small, don't bother SSE
if (n < 32)
return search(start,needle,feedback);
const __m256i first = _mm256_set1_epi8(needle[0]); const __m256i first = _mm256_set1_epi8(needle[0]);
const __m256i last = _mm256_set1_epi8(needle[pattern_length - 1]); const __m256i last = _mm256_set1_epi8(needle[pattern_length - 1]);
assert(start.entry);
unsigned char* s = start.get_ptr(); unsigned char* s = start.get_ptr();
int n = start.leftInChunk();
int i = 0; int i = 0;
*feedback = -1; *feedback = -1;
FlowBufferContentIter next; FlowBufferContentIter next = start;
next.moveToNextChunk();
__m256i eq_first;
while (true) { while (true) {
//click_chatter("Search at %d/%d",i,n);
const __m256i block_first = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(s + i)); const __m256i block_first = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(s + i));
const __m256i block_last = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(s + i + pattern_length - 1)); const __m256i block_last = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(s + i + pattern_length - 1));
const __m256i eq_first = _mm256_cmpeq_epi8(first, block_first); eq_first = _mm256_cmpeq_epi8(first, block_first);
const __m256i eq_last = _mm256_cmpeq_epi8(last, block_last); const __m256i eq_last = _mm256_cmpeq_epi8(last, block_last);
uint32_t mask = _mm256_movemask_epi8(_mm256_and_si256(eq_first, eq_last)); uint32_t mask = _mm256_movemask_epi8(_mm256_and_si256(eq_first, eq_last));
...@@ -212,7 +215,14 @@ FlowBufferContentIter FlowBuffer::searchSSE(FlowBufferContentIter start, const c ...@@ -212,7 +215,14 @@ FlowBufferContentIter FlowBuffer::searchSSE(FlowBufferContentIter start, const c
i += 32; i += 32;
if (i + 32 + pattern_length >= n) { if (i + 32 + pattern_length >= n) {
if (start.lastChunk()) { if (start.lastChunk()) {
start += i - pattern_length; //We have to check if a pattern could start in the last part
uint32_t z = _mm256_movemask_epi8(eq_first);
if (z != 0) {
start += min(n - pattern_length, i - pattern_length);
} else {
start += i - pattern_length;
}
//click_chatter("searching now!"); //click_chatter("searching now!");
return search(start,needle,feedback); return search(start,needle,feedback);
} else { } else {
...@@ -236,6 +246,9 @@ FlowBufferContentIter FlowBuffer::searchSSE(FlowBufferContentIter start, const c ...@@ -236,6 +246,9 @@ FlowBufferContentIter FlowBuffer::searchSSE(FlowBufferContentIter start, const c
} }
} }
} }
//Should be unreachable
assert(false);
*feedback = -1; *feedback = -1;
return contentEnd(); return contentEnd();
} }
...@@ -347,7 +360,7 @@ int FlowBuffer::removeInFlow(const char* pattern, StackElement* owner) ...@@ -347,7 +360,7 @@ int FlowBuffer::removeInFlow(const char* pattern, StackElement* owner)
return 1; return 1;
} }
void FlowBuffer::remove(FlowBufferContentIter start, uint32_t length, StackElement* owner) void FlowBuffer::remove(const FlowBufferContentIter start, uint32_t length, StackElement* owner)
{ {
assert(length > 0); assert(length > 0);
uint32_t toRemove = length; uint32_t toRemove = length;
......
...@@ -910,7 +910,7 @@ Packet::clone(bool fast) ...@@ -910,7 +910,7 @@ Packet::clone(bool fast)
Packet *p = new WritablePacket; // no initialization Packet *p = new WritablePacket; // no initialization
# endif # endif
if (!p) if (!p)
return 0; return 0;
if (unlikely(fast)) { if (unlikely(fast)) {
memcpy(p, this, sizeof(Packet)); memcpy(p, this, sizeof(Packet));
p->_use_count = 1; p->_use_count = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment