<-- Home

DPDK简单收发包程序示例

DPDK是用于报文快速处理的库和驱动程序集,因为它比较接近底层,配置和使用略麻烦。用户越看不见的东西越复杂,也越值得我们去研究。在此记录一个简单DPDK网络底层应用程序的实现。

该程序实现的功能是将它收到的包发回给源,也就是说我们只需交换包中的目的mac地址和源mac地址。程序如下:

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>

#include <rte_eal.h>
#include <rte_common.h>
#include <rte_ethdev.h>
#include <rte_log.h>
#include <rte_mbuf.h>


#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250

#define RX_RING_SIZE 512
#define TX_RING_SIZE 512

#define BURST_SIZE 32

static volatile bool force_quit;

#define RTE_LOGTYPE_APP RTE_LOGTYPE_USER1


// 输出设备的mac地址
static void
print_mac(unsigned int port_id)
{
  struct ether_addr dev_eth_addr;
  rte_eth_macaddr_get(port_id, &dev_eth_addr);
    printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\n\n",
                    (unsigned int) port_id,
                    dev_eth_addr.addr_bytes[0],
                    dev_eth_addr.addr_bytes[1],
                    dev_eth_addr.addr_bytes[2],
                    dev_eth_addr.addr_bytes[3],
                    dev_eth_addr.addr_bytes[4],
                    dev_eth_addr.addr_bytes[5]);
}

// 将pkt中的源mac地址和目的mac地址交换
static void
mac_swap(struct rte_mbuf **bufs, uint16_t nb_mbufs)
{
  struct ether_hdr *eth;
  struct ether_addr tmp;
  struct rte_mbuf *m;
  uint16_t buf;

  for (buf = 0; buf < nb_mbufs; buf++) {
    m = bufs[buf];
    eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
    ether_addr_copy(&eth->s_addr, &tmp);
    ether_addr_copy(&eth->d_addr, &eth->s_addr);
    ether_addr_copy(&tmp, &eth->d_addr);
  }
}

// 用于检查端口连接状态
static int
check_link_status(uint16_t nb_ports)
{
  struct rte_eth_link link;
  uint8_t port;

  for (port = 0; port < nb_ports; port++) {
    rte_eth_link_get(port, &link);

    if (link.link_status == ETH_LINK_DOWN) {
      RTE_LOG(INFO, APP, "Port: %u Link DOWN\n", port);
      return -1;
    }

    RTE_LOG(INFO, APP, "Port: %u Link UP Speed %u\n",
      port, link.link_speed);
  }

  return 0;
}

// 在程序运行结束后统计收发包信息
static void
print_stats(void)
{
  struct rte_eth_stats stats;
  uint8_t nb_ports = rte_eth_dev_count();
  uint8_t port;

  for (port = 0; port < nb_ports; port++) {
    printf("\nStatistics for port %u\n", port);
    rte_eth_stats_get(port, &stats);
    printf("Rx:%9"PRIu64" Tx:%9"PRIu64" dropped:%9"PRIu64"\n",
      stats.ipackets, stats.opackets, stats.imissed);
  }
}

// 处理中断信号,统计信息并终止程序
static void
signal_handler(int signum)
{
  if (signum == SIGINT || signum == SIGTERM) {
    printf("\n\nSignal %d received, preparing to exit...\n",
        signum);
    force_quit = true;
    print_stats();
  }
}

// 初始化设备端口,配置收发队列
static inline int
port_init(uint8_t port, struct rte_mempool *mbuf_pool)
{
  struct rte_eth_conf port_conf;
  port_conf.link_speeds = ETH_LINK_SPEED_1G;
  port_conf.rxmode.max_rx_pkt_len = ETHER_MAX_LEN;
  const uint16_t nb_rx_queues = 1;
  const uint16_t nb_tx_queues = 1;
  int ret;
  uint16_t q;

  // 配置设备
  ret = rte_eth_dev_configure(port,
      nb_rx_queues,
      nb_tx_queues,
      &port_conf);
  if (ret != 0)
    return ret;

  // 配置收包队列
  for (q = 0; q < nb_rx_queues; q++) {
    ret= rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
        rte_eth_dev_socket_id(port),
        NULL, mbuf_pool);
    if (ret < 0)
      return ret;
  }

  // 配置发包队列
  for (q = 0; q < nb_tx_queues; q++) {
    ret= rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
        rte_eth_dev_socket_id(port),
        NULL);
    if (ret < 0)
      return ret;
  }

  // 启动设备
  ret = rte_eth_dev_start(port);
  if (ret < 0)
    return ret;

  // 开启混杂模式
  rte_eth_promiscuous_enable(port);

  return 0;
}

int lcore_main(void *arg)
{
  unsigned int lcore_id = rte_lcore_id();
  const uint8_t nb_ports = rte_eth_dev_count();
  uint8_t port;

  RTE_LOG(INFO, APP, "lcore %u running\n", lcore_id);

  while (!force_quit) {
    for (port = 0; port < nb_ports; port++) {
      struct rte_mbuf *bufs[BURST_SIZE];
      uint16_t nb_rx;
      uint16_t nb_tx;
      uint16_t buf;

      // 接受数据包
      nb_rx = rte_eth_rx_burst(port, 0,
          bufs, BURST_SIZE);

      if (unlikely(nb_rx == 0))
        continue;

      // 交换mac地址
      mac_swap(bufs, nb_rx);

      // 发送数据包
      nb_tx = rte_eth_tx_burst(port, 0,
          bufs, nb_rx);

      if (unlikely(nb_tx < nb_rx)) {
        for (buf = nb_tx; buf < nb_rx; buf++)
          rte_pktmbuf_free(bufs[buf]);
      }
    }
  }
  RTE_LOG(INFO, APP, "lcore %u exiting\n", lcore_id);
  return 0;
}


int main(int argc, char *argv[])
{
  int ret;
  uint8_t port;
  uint8_t nb_ports;
  struct rte_mempool *mbuf_pool;
  uint8_t portid;
  
  // 初始化DPDK
  ret = rte_eal_init(argc, argv);
  if (ret < 0)
    rte_exit(EXIT_FAILURE, "EAL Init failed\n");

  argc -= ret;
  argv += ret;

  // 注册中断信号处理函数
  force_quit = false;
  signal(SIGINT, signal_handler);
  signal(SIGTERM, signal_handler);

  nb_ports = rte_eth_dev_count();

  for (port = 0; port < nb_ports; port++) {
    print_mac(port);
  }

  RTE_LOG(INFO, APP, "Number of ports:%u\n", nb_ports);

  // 申请mbuf内存池
  mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
    NUM_MBUFS * nb_ports,
    MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
    rte_socket_id());

  if (mbuf_pool == NULL)
    rte_exit(EXIT_FAILURE, "mbuf_pool create failed\n");

  // 配置设备
  for (portid = 0; portid < nb_ports; portid++)
      if (port_init(portid, mbuf_pool) != 0)
        rte_exit(EXIT_FAILURE, "port init failed\n");

  // 检查连接状态
  ret = check_link_status(nb_ports);
  if (ret < 0)
    RTE_LOG(WARNING, APP, "Some ports are down\n");				
  
  // 线程核心绑定,循环处理数据包
  rte_eal_mp_remote_launch(lcore_main, NULL, SKIP_MASTER);

  rte_eal_mp_wait_lcore();

}

使用CMake来生成Makefile编译会方便不少

PROJECT(sample)
CMAKE_MINIMUM_REQUIRED(VERSION 2.6)
set(DPDK_BUILD_DIR "$ENV{RTE_SDK}/$ENV{RTE_TARGET}")
set(CMAKE_CXX_FLAGS "-msse4.2")

include_directories("${DPDK_BUILD_DIR}/include")
link_directories("${DPDK_BUILD_DIR}/lib")

add_executable(sample main.cpp)

target_link_libraries(
    sample
    -Wl,--whole-archive 
    -ldpdk
    -Wl,--no-whole-archive 
    -lrt 
    -lm 
    -ldl 
    -lcrypto 
    -pthread
)

为了测试该程序的功能,我们需要自己组装报文发送给DPDK托管的网卡。可以使用RAW SOCKET原始套接字实现该功能。

Linux下使用RAW SOCKET比较方便,我写了个Ruby脚本来实现简单的发包功能。

require 'socket'

interface = 'enp0s3'         # interface name
interface_index = 0x8933     # SIOCGIFINDEX

frame = "\x08\x00\x27\x91\xf7\xd3\0a\x00\x27\x00\x00\x0a"

(12...100).each do |i|
  frame += i.chr
end

socket = Socket.new(Socket::AF_PACKET, Socket::SOCK_RAW, Socket::IPPROTO_RAW)
ifreq = [interface.dup].pack('a32')
socket.ioctl(interface_index, ifreq)
socket.bind([Socket::AF_PACKET].pack('s') + [Socket::IPPROTO_RAW].pack('n') + ifreq[16..20]+ ("\x00" * 12))

socket.send(frame, 0)

Windows下直接发包有点问题,可以使用Winpcap这个库来发包。 使用C++编写发包程序如下:

#define WIN32
#include <cstdlib>
#include <cstdio>
#include <pcap.h>

int main(int argc, char **argv) {
  pcap_t *fp;
  char errbuf[PCAP_ERRBUF_SIZE];
  u_char packet[100];
  int i;

  char* dev = "{3AE37740-F834-43CF-9BC0-20548F61CCF4}";

  /* Open the adapter */
  if ((fp = pcap_open_live(dev, 65536, 1, 1000,	errbuf)) == NULL) {
    fprintf(stderr,"\nUnable to open the adapter. %s is not supported by WinPcap\n", dev);
    return 2;
  }

  packet[0]=0x08;
  packet[1]=0x00;
  packet[2]=0x27;
  packet[3]=0x91;
  packet[4]=0xf7;
  packet[5]=0xd3;

  packet[6]=0x0a;
  packet[7]=0x00;
  packet[8]=0x27;
  packet[9]=0x00;
  packet[10]=0x00;
  packet[11]=0x0a;

  for(i = 12;i < 100; i++) {
    packet[i]= (u_char)i;
  }
 
  if (pcap_sendpacket(fp, packet, 100) != 0) {
    fprintf(stderr,"\nError sending the packet: %s\n", pcap_geterr(fp));
    return 3;
  }

  pcap_close(fp);
  return 0;
}

{3AE37740-F834-43CF-9BC0-20548F61CCF4}才是Windows下的设备名称,而不是“以太网”、“网络连接1”之类的。要方便快捷的获取设备名称,可以调用wireshark -D命令来获得。

最后我们使用Wireshark抓一下包

源主机发出的包,又被转发给了源主机。Wireshark的时间显示错误可不是我的锅!(╯‵□′)╯︵┻━┻