通过前面的分析,对DPDK中对报文处理的过程有了一个初步的认知。从一个更高层次来看,传统的网络通信一般会通过上层应用、操作系统、网卡驱动和硬件四层。再往下,基本就不属于于计算机控制的系统了。
早期的应用,基本上这四个层次都比较单纯,所以也没啥复杂的管理方式,你来我往,你应我答这事就成了。可世风不古,上层应用的快速发展,使得底层的压力骤增。软件和硬件的发展是相互影响的,软件新需求的提出,硬件会在一个阶段后满足,即使无法通过正常的技术迭代满足,也可以使用其它方式。比如CPU的核心频率提高成本太高,那么并行技术(多核或多CPU)就出现了。明白了这些,再向下看DPDK中对网络中数据的管理就好理解了。
从大方向上看,DPDK只是省略了其中一些步骤,并不是完全重构了一套理论逻辑,所以在DPDK中,基本应用的仍然是成熟的理论和技术,也就是常说的工程创新。
CPU核数的增加,其实就代表了多任务可以并行进行。CPU是干啥的?处理数据的。所以为了对应多个核,那么是不是应该有多个数据队列来供使用?这就是多队列技术。同样为了更好的处理多队列,OS的底层也要对其进行支持,毕竟你不能让每个上层应用自己都写一个多队列来处理和网络的通信。
目前基本网卡本身都支持了多队列技术,DPDK(或内核本身)也提供了多队列技术,这样就涉及到了一个问题,这两个多队列是如何进行应对的?如何应对才能达到最好的效果?这才是重点。
在前面的分析中可以发现,DPDK中的Pack I/O是天然支持多队列技术的。通过一系列的配置,可以绑定与CPU核心、队列等。而网卡报文的队列分配一般是使用RSS和Flow Director技术。RSS其实就是哈希均匀分配到队列,而后者则是使用查找的精确匹配方式将包发送到指定队列。另外还需要注意的是优先给,毕竟有的包命令是处理一些控制状态的,需要优先处理,这都需要在包分配时有一些应对机制。
为了实现多队列和提到的优先级,就需要对包的种类进行分类。比如常见的TCP、IP以及自定义或者云等虚拟包等。有两种处理的方式即上面提到的RSS和Flow Director。
1、RSS(Receive-Side Scaling),接收方向扩展
负载均衡,这个光看名字都知道什么意思,特别是这些年来开发技术的普及,基本上搞后台搞网络的都懂这个技术。简单来说就是不分彼此,不厚此薄彼,每个队列享受相同的待遇。这里面的重点是哈希计算的对应值怎么取?可以是网络四元组也可以是其它,这个根据实际情况来定。
2、Flow Director
精确字段匹配,可以将报文数据转到相应的队列中。在网卡上会有一个表,表中会有相关的关键字和动作。此表由驱动来进行管理,每当数据从网卡进入,就可以通过查询此表来进行相关的动作。
这种工作方式比RSS更有数据的灵活性,可以根据实际情况自定义相关的特定需求。
3、QoS-服务质量
其实就是对于些特定的业务需求进行调度,划分类型并确定优先级。
4、虚拟场景下的分类
现在的实际业务中,云虚拟化的业务占有非常大的比重,所以会有一组队列与SRIOV VF/VMDQ POOL等相对应进行数据处理。
5、流过滤
主要是通过分类进行合法性验证。过滤的方法有MAC地址过滤、VLAN标签过滤和管理数据包过滤。当然,在不同的网卡上也提供了多种规则供调用,它们包括:N tuple filter(N元组指定队列)、EtherType Filter(以太网报文的EtherType指定队列)、Cloud Filter(云应用中的VXLAN等隧道报文指定队列)等。
如果实现流分类,那么必须明白流是哪种类型,可这种类型从哪里得到呢?就需要解析数据报文的头,而在网卡中可以使用接收描述符来快速确定包的类型。在DPDK中的m_buf中定义有相关的对应字段来映射类型。
struct rte_mbuf
{
...
/*
* The packet type, which is the combination of outer/inner L2, L3, L4
* and tunnel types. The packet_type is about data really present in the
* mbuf. Example: if vlan stripping is enabled, a received vlan packet
* would have RTE_PTYPE_L2_ETHER and not RTE_PTYPE_L2_VLAN because the
* vlan is stripped from the data.
*/
RTE_STD_C11
union {
uint32_t packet_type; /**< L2/L3/L4 and tunnel information. */
struct {
uint32_t l2_type:4; /**< (Outer) L2 type. */
uint32_t l3_type:4; /**< (Outer) L3 type. */
uint32_t l4_type:4; /**< (Outer) L4 type. */
uint32_t tun_type:4; /**< Tunnel type. */
RTE_STD_C11
union {
uint8_t inner_esp_next_proto;
/**< ESP next protocol type, valid if
* RTE_PTYPE_TUNNEL_ESP tunnel type is set
* on both Tx and Rx.
*/
__extension__
struct {
uint8_t inner_l2_type:4;
/**< Inner L2 type. */
uint8_t inner_l3_type:4;
/**< Inner L3 type. */
};
};
uint32_t inner_l4_type:4; /**< Inner L4 type. */
};
};
...
};
有这个类型的处理就可以动态的快速把报文转到相关队列。不过需要注意的是,虽然主流的网卡都支持多队列,但并不是所有的都支持此功能。同样,即使支持,支持的程度也有不同。
DPDK中遇到的场景非常复杂,可能有的需要数据?发,有的需要命令处理有的需要逻辑转发等等不一而足。那么,对数据队列就可以动态的根据实际情况来进行定制。DPDK可以通过RSS实现负载均衡,然后再通过Flow Director实现报文的定向分配,通过绑定的专门的核心进行特定的数据队列处理。
另外DPDK中还可以通过虚拟化网卡来进行工作,实现动态的资源利用。
先看一下相关的配置数据结构体定义:
//\dpdk-stable-19.11.14\lib\librte_ethdev
/**
* A structure used to configure an Ethernet port.
* Depending upon the RX multi-queue mode, extra advanced
* configuration settings may be needed.
*/
struct rte_eth_conf {
uint32_t link_speeds; /**< bitmap of ETH_LINK_SPEED_XXX of speeds to be
used. ETH_LINK_SPEED_FIXED disables link
autonegotiation, and a unique speed shall be
set. Otherwise, the bitmap defines the set of
speeds to be advertised. If the special value
ETH_LINK_SPEED_AUTONEG (0) is used, all speeds
supported are advertised. */
struct rte_eth_rxmode rxmode; /**< Port RX configuration. */
struct rte_eth_txmode txmode; /**< Port TX configuration. */
uint32_t lpbk_mode; /**< Loopback operation mode. By default the value
is 0, meaning the loopback mode is disabled.
Read the datasheet of given ethernet controller
for details. The possible values of this field
are defined in implementation of each driver. */
struct {
struct rte_eth_rss_conf rss_conf; /**< Port RSS configuration */
struct rte_eth_vmdq_dcb_conf vmdq_dcb_conf;
/**< Port vmdq+dcb configuration. */
struct rte_eth_dcb_rx_conf dcb_rx_conf;
/**< Port dcb RX configuration. */
struct rte_eth_vmdq_rx_conf vmdq_rx_conf;
/**< Port vmdq RX configuration. */
} rx_adv_conf; /**< Port RX filtering configuration. */
union {
struct rte_eth_vmdq_dcb_tx_conf vmdq_dcb_tx_conf;
/**< Port vmdq+dcb TX configuration. */
struct rte_eth_dcb_tx_conf dcb_tx_conf;
/**< Port dcb TX configuration. */
struct rte_eth_vmdq_tx_conf vmdq_tx_conf;
/**< Port vmdq TX configuration. */
} tx_adv_conf; /**< Port TX DCB configuration (union). */
/** Currently,Priority Flow Control(PFC) are supported,if DCB with PFC
is needed,and the variable must be set ETH_DCB_PFC_SUPPORT. */
uint32_t dcb_capability_en;
struct rte_fdir_conf fdir_conf; /**< FDIR configuration. DEPRECATED */
struct rte_intr_conf intr_conf; /**< Interrupt mode configuration. */
};
在这个结构定义中,可以看到intr_conf、fdir_conf和rx_adv_conf以及rxmode。而fdir_conf和rx_adv_conf就涉及Flow Director和RSS的配置。RSS分为三种即单一队列的RSS、带有硬件队列的RSS和还有消息中断的RSS(其实还是硬件不匹配造成的)。
在DPDK提供的例程l3fwd中,可以看到相关的配置代码:
int
main(int argc, char **argv)
{
...
//配置相关的RSS或FD
struct rte_eth_conf local_port_conf = port_conf;
/* skip ports that are not enabled */
if ((enabled_port_mask & (1 << portid)) == 0) {
printf("\nSkipping disabled port %d\n", portid);
continue;
}
/* init port */
printf("Initializing port %d ... ", portid );
fflush(stdout);
nb_rx_queue = get_port_n_rx_queues(portid);
n_tx_queue = nb_lcores;
if (n_tx_queue > MAX_TX_QUEUE_PER_PORT)
n_tx_queue = MAX_TX_QUEUE_PER_PORT;
printf("Creating queues: nb_rxq=%d nb_txq=%u... ",
nb_rx_queue, (unsigned)n_tx_queue );
ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0)
rte_exit(EXIT_FAILURE,
"Error during getting device (port %u) info: %s\n",
portid, strerror(-ret));
if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
local_port_conf.txmode.offloads |=
DEV_TX_OFFLOAD_MBUF_FAST_FREE;
local_port_conf.rx_adv_conf.rss_conf.rss_hf &=
dev_info.flow_type_rss_offloads;
if (local_port_conf.rx_adv_conf.rss_conf.rss_hf !=
port_conf.rx_adv_conf.rss_conf.rss_hf) {
printf("Port %u modified RSS hash function based on hardware support,"
"requested:%#"PRIx64" configured:%#"PRIx64"\n",
portid,
port_conf.rx_adv_conf.rss_conf.rss_hf,
local_port_conf.rx_adv_conf.rss_conf.rss_hf);
}
ret = rte_eth_dev_configure(portid, nb_rx_queue,
(uint16_t)n_tx_queue, &local_port_conf);
// 配置收发队列
for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
if (rte_lcore_is_enabled(lcore_id) == 0)
continue;
if (numa_on)
socketid =
(uint8_t)rte_lcore_to_socket_id(lcore_id);
else
socketid = 0;
printf("txq=%u,%d,%d ", lcore_id, queueid, socketid);
fflush(stdout);
txconf = &dev_info.default_txconf;
txconf->offloads = local_port_conf.txmode.offloads;
ret = rte_eth_tx_queue_setup(portid, queueid, nb_txd,
socketid, txconf);
if (ret < 0)
rte_exit(EXIT_FAILURE,
"rte_eth_tx_queue_setup: err=%d, "
"port=%d\n", ret, portid);
qconf = &lcore_conf[lcore_id];
qconf->tx_queue_id[portid] = queueid;
queueid++;
qconf->tx_port_id[qconf->n_tx_port] = portid;
qconf->n_tx_port++;
}
...
for(queue = 0; queue < qconf->n_rx_queue; ++queue) {
struct rte_eth_rxconf rxq_conf;
portid = qconf->rx_queue_list[queue].port_id;
queueid = qconf->rx_queue_list[queue].queue_id;
if (numa_on)
socketid =
(uint8_t)rte_lcore_to_socket_id(lcore_id);
else
socketid = 0;
printf("rxq=%d,%d,%d ", portid, queueid, socketid);
fflush(stdout);
ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0)
rte_exit(EXIT_FAILURE,
"Error during getting device (port %u) info: %s\n",
portid, strerror(-ret));
rxq_conf = dev_info.default_rxconf;
rxq_conf.offloads = port_conf.rxmode.offloads;
if (!per_port_pool)
ret = rte_eth_rx_queue_setup(portid, queueid,
nb_rxd, socketid,
&rxq_conf,
pktmbuf_pool[0][socketid]);
else
ret = rte_eth_rx_queue_setup(portid, queueid,
nb_rxd, socketid,
&rxq_conf,
pktmbuf_pool[portid][socketid]);
if (ret < 0)
rte_exit(EXIT_FAILURE,
"rte_eth_rx_queue_setup: err=%d, port=%d\n",
ret, portid);
}
//启动设备
/* Start device */
ret = rte_eth_dev_start(portid);
...
}
启动设备的函数在rte_ethdev.c:
int
rte_eth_dev_start(uint16_t port_id)
{
struct rte_eth_dev *dev;
struct rte_eth_dev_info dev_info;
int diag;
int ret;
RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, -EINVAL);
dev = &rte_eth_devices[port_id];
RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->dev_start, -ENOTSUP);
if (dev->data->dev_started != 0) {
RTE_ETHDEV_LOG(INFO,
"Device with port_id=%"PRIu16" already started\n",
port_id);
return 0;
}
ret = rte_eth_dev_info_get(port_id, &dev_info);
if (ret != 0)
return ret;
/* Lets restore MAC now if device does not support live change */
if (*dev_info.dev_flags & RTE_ETH_DEV_NOLIVE_MAC_ADDR)
rte_eth_dev_mac_restore(dev, &dev_info);
diag = (*dev->dev_ops->dev_start)(dev);
if (diag == 0)
dev->data->dev_started = 1;
else
return eth_err(port_id, diag);
ret = rte_eth_dev_config_restore(dev, &dev_info, port_id);
if (ret != 0) {
RTE_ETHDEV_LOG(ERR,
"Error during restoring configuration for device (port %u): %s\n",
port_id, rte_strerror(-ret));
rte_eth_dev_stop(port_id);
return ret;
}
if (dev->data->dev_conf.intr_conf.lsc == 0) {
RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->link_update, -ENOTSUP);
(*dev->dev_ops->link_update)(dev, 0);
}
return 0;
}
在DPDK使用RSS:
static struct rte_eth_conf port_conf = {
.rxmode = {
.mq_mode = ETH_MQ_RX_RSS,
.max_rx_pkt_len = RTE_ETHER_MAX_LEN,
.split_hdr_size = 0,
.offloads = DEV_RX_OFFLOAD_CHECKSUM,
},
.rx_adv_conf = {
.rss_conf = {
.rss_key = NULL,
.rss_hf = ETH_RSS_IP,
},
},
.txmode = {
.mq_mode = ETH_MQ_TX_NONE,
},
};
使用Flow Director:
//\app\test-pmd
//cmd_flow_director_filter_parsed函数中
struct rte_eth_fdir_filter entry;
...
/* set to report FD ID by default */
entry.action.report_status = RTE_ETH_FDIR_REPORT_ID;
entry.action.rx_queue = res->queue_id;
entry.soft_id = res->fd_id_value;
if (!strcmp(res->ops, "add"))
ret = rte_eth_dev_filter_ctrl(res->port_id, RTE_ETH_FILTER_FDIR,
RTE_ETH_FILTER_ADD, &entry);
else if (!strcmp(res->ops, "del"))
ret = rte_eth_dev_filter_ctrl(res->port_id, RTE_ETH_FILTER_FDIR,
RTE_ETH_FILTER_DELETE, &entry);
else
ret = rte_eth_dev_filter_ctrl(res->port_id, RTE_ETH_FILTER_FDIR,
RTE_ETH_FILTER_UPDATE, &entry);
涉及到硬件和OS,就需要看相关的厂商提供的文档和OS底层的配置。这里面有一个问题,特别是在硬件上,有些问题不是问题,而是规则。这个规则是否合理,就需要一个较长的时间来验证然后才可能更正。所以在DPDK这种和硬件亲和性较强的开发套件使用中,要有一个清醒的认识,不能一味的追求为什么?而到了规则的地方就按它的规则走就可以了。这不像纯软件,条条大路通罗马,实在不行可以硬来一下。