void ipoib_cm_handle_rx_wc_rss(struct net_device *dev, struct ib_wc *wc)
{
struct ipoib_dev_priv *priv = ipoib_priv(dev);
struct ipoib_cm_rx_buf *rx_ring;
unsigned int wr_id = wc->wr_id & ~(IPOIB_OP_CM | IPOIB_OP_RECV);
struct sk_buff *skb, *newskb;
struct ipoib_cm_rx *p;
unsigned long flags;
u64 mapping[IPOIB_CM_RX_SG];
int frags;
int has_srq;
struct sk_buff *small_skb;
ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
wr_id, wc->status);
if (unlikely(wr_id >= priv->recvq_size)) {
if (wr_id == (IPOIB_CM_RX_DRAIN_WRID & ~(IPOIB_OP_CM | IPOIB_OP_RECV))) {
spin_lock_irqsave(&priv->lock, flags);
list_splice_init(&priv->cm.rx_drain_list, &priv->cm.rx_reap_list);
ipoib_cm_start_rx_drain(priv);
queue_work(priv->wq, &priv->cm.rx_reap_task);
spin_unlock_irqrestore(&priv->lock, flags);
} else
ipoib_warn(priv, "cm recv completion event with wrid %d (> %d)\n",
wr_id, priv->recvq_size);
return;
}
p = wc->qp->qp_context;
has_srq = ipoib_cm_has_srq(dev);
rx_ring = has_srq ? priv->cm.srq_ring : p->rx_ring;
skb = rx_ring[wr_id].skb;
if (unlikely(wc->status != IB_WC_SUCCESS)) {
ipoib_dbg(priv, "cm recv error "
"(status=%d, wrid=%d vend_err %x)\n",
wc->status, wr_id, wc->vendor_err);
++priv->recv_ring[p->index].stats.rx_dropped;
if (has_srq)
goto repost;
else {
if (!--p->recv_count) {
spin_lock_irqsave(&priv->lock, flags);
list_move(&p->list, &priv->cm.rx_reap_list);
spin_unlock_irqrestore(&priv->lock, flags);
queue_work(priv->wq, &priv->cm.rx_reap_task);
}
return;
}
}
if (unlikely(!(wr_id & IPOIB_CM_RX_UPDATE_MASK))) {
if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
spin_lock_irqsave(&priv->lock, flags);
p->jiffies = jiffies;
/* Move this entry to list head, but do not re-add it
* if it has been moved out of list. */
if (p->state == IPOIB_CM_RX_LIVE)
list_move(&p->list, &priv->cm.passive_ids);
spin_unlock_irqrestore(&priv->lock, flags);
}
}
if (wc->byte_len < IPOIB_CM_COPYBREAK) {
int dlen = wc->byte_len;
small_skb = dev_alloc_skb(dlen + IPOIB_CM_RX_RESERVE);
if (small_skb) {
skb_reserve(small_skb, IPOIB_CM_RX_RESERVE);
ib_dma_sync_single_for_cpu(priv->ca, rx_ring[wr_id].mapping[0],
dlen, DMA_FROM_DEVICE);
skb_copy_from_linear_data(skb, small_skb->data, dlen);
ib_dma_sync_single_for_device(priv->ca, rx_ring[wr_id].mapping[0],
dlen, DMA_FROM_DEVICE);
skb_put(small_skb, dlen);
skb = small_skb;
goto copied;
}
}
frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
(unsigned)IPOIB_CM_HEAD_SIZE)) / PAGE_SIZE;
newskb = ipoib_cm_alloc_rx_skb(dev, rx_ring, wr_id, frags,
mapping, GFP_ATOMIC);
if (unlikely(!newskb)) {
/*
* If we can't allocate a new RX buffer, dump
* this packet and reuse the old buffer.
*/
ipoib_dbg(priv, "failed to allocate receive buffer %d\n", wr_id);
++priv->recv_ring[p->index].stats.rx_dropped;
goto repost;
}
ipoib_cm_dma_unmap_rx(priv, frags, rx_ring[wr_id].mapping);
memcpy(rx_ring[wr_id].mapping, mapping, (frags + 1) * sizeof *mapping);
ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
wc->byte_len, wc->slid);
skb_put_frags(skb, IPOIB_CM_HEAD_SIZE, wc->byte_len, newskb);
copied:
skb->protocol = ((struct ipoib_header *) skb->data)->proto;
skb_add_pseudo_hdr(skb);
++priv->recv_ring[p->index].stats.rx_packets;
priv->recv_ring[p->index].stats.rx_bytes += skb->len;
skb->dev = dev;
/* XXX get correct PACKET_ type here */
skb->pkt_type = PACKET_HOST;
netif_receive_skb(skb);
repost:
if (has_srq) {
if (unlikely(ipoib_cm_post_receive_srq_rss(dev,
p->index,
wr_id)))
ipoib_warn(priv, "ipoib_cm_post_receive_srq_rss failed "
"for buf %d\n", wr_id);
} else {
if (unlikely(ipoib_cm_post_receive_nonsrq_rss(dev, p,
wr_id))) {
--p->recv_count;
ipoib_warn(priv, "ipoib_cm_post_receive_nonsrq_rss failed "
"for buf %d\n", wr_id);
}
}
}
这个函数 ipoib_cm_handle_rx_wc_rss 是处理 IP over InfiniBand (IPoIB) 在连通模式(Connected Mode, CM)下收到的网络包的函数。它利用了RSS功能来实现多核CPU处理网络流量的分发。在InfiniBand网络中,Completion Work Queues(CQ)包含用来标识网络操作完成情况的工作完成结构(Work Completion, WC)。当一个接收操作完成后,会调用这个函数来处理WC。以下是函数的详细描述:
1. 函数接收两个参数:
?? - *dev: 指向`net_device`结构体的指针,与InfiniBand设备相关联。
?? - *wc: 指向`ib_wc` (IB Work Completion)结构体的指针,它包含了完成接收操作的状态信息。
2. 函数首先通过调用`ipoib_priv(dev)`获取到针对`net_device`的私有IPoIB结构(ipoib_dev_priv)的指针。
3. wr_id是工作请求的ID,通过将WC的`wr_id`字段与定义好的掩码进行逻辑与操作,去除特定的标记位(如 IPOIB_OP_CM 和 IPOIB_OP_RECV)。
4. 如果`wr_id`超出了接收队列(recvq_size)的大小范围,那么会打印一条警告信息,并且根据情况要么处理排干(drain)操作,要么直接返回。
5. wc->qp->qp_context获取到了与此WC相关的队列对(Queue Pair, QP)上下文,这通常是一个指向`ipoib_cm_rx`结构体的指针。
6. has_srq变量用来检测当前设备是否使用了共享接收队列(Shared Receive Queue, SRQ)。根据这个变量的值,`rx_ring`变量会指向不同的接收缓冲区。
7. 接下来,根据`wr_id`在接收缓冲区中找到与此WC关联的socket缓冲区(sk_buff,简称skb)。
8. 如果`wc->status`不等于`IB_WC_SUCCESS`,表示接收操作出错,它会递增丢包计数,并根据是否使用SRQ采取不同的行动。
9. 如果`wr_id`满足特定条件,定期将对应的`ipoib_cm_rx`结构移动到活着的ID列表中。
10. 如果接收到的包的长度小于`IPOIB_CM_COPYBREAK`(一种阈值),函数会尝试分配一个新的skb来拷贝数据,而不是用分散/聚集(scatter-gather)机制。
11. 如果数据包较长,则会尝试为其分配一个新的skb,并重新映射内存,保证足够的空间来接收数据。
12. 在成功接收数据包后,函数设置skb的协议类型,并添加伪头部(pseudo header)之后,通过 netif_receive_skb 函数将skb传递给网络子系统进一步处理。
13. 在最后的`repost`标签处,根据是否使用SRQ,把缓冲区重新投递给硬件,以便可以接收更多的数据包。
函数最后的操作是通过 netif_receive_skb 进行网络接收,这个函数的作用是将接收到的数据包交给上层网络协议栈处理。