Zinx 框架之 UDP

发布时间:2023年12月27日

Zinx 没有自己的UDP ,官网例子是通过Zinx Kcp 来实现UDP 通信。

我们实现UDP 也是通过标准库net 来实现。 因为需要UDP TCP WebSocket 等,因为Zinx TCP和WebSocket 已经实现,只有UDP没有。所以本文讲述如何通过在Zinx框架中加入UDP 。

实现UDP 以Zinx的方式实现。

udpclient.go?

package znet

import (
	"fmt"
	"net"
	"sync"
	"ziface"
	"zlog"
)

type UdpClient struct {
	// udp Name 连接的名称
	Name string
	// IP of the target server to connect 目标链接服务器的IP
	Ip string
	// Port of the target server to connect 目标链接服务器的端口
	Port int
	// Asynchronous channel for capturing connection close status 异步捕获链接关闭状态
	exitChan chan struct{}
	// Message management module 消息管理模块
	msgHandler ziface.IMsgHandle
	// Error channel
	ErrChan chan error
	//当前的Server添加一个router,server注册的链接对应的处理业务
	Router ziface.IRouter
	// (用户收发消息的Lock)
	msgLock sync.RWMutex
	//设备ID
	connID uint64

	// Connection instance 链接实例
	conn ziface.IConnection

	queue ziface.IQueue
}

func NewUdpClient (name string, ip string, port int) ziface.UdpClient {

	u := &UdpConnect{
		Name:       name,
		Ip:         ip,
		Port:       port,
		msgHandler: newMsgHandle(),
		ErrChan:    make(chan error),
		queue:      NewQueue(),
	}

	return u
}

func (u *UdpClient ) Start() {
	fmt.Printf("UdpConnect start \n")
	u.exitChan = make(chan struct{})

	go func() {
		// Get a Udp address
		udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", u.Ip, u.Port))
		if err != nil {
			zlog.Ins().ErrorF("[START] resolve udp addr err: %v\n", err)
			u.ErrChan <- err
			return
		}

		// Create a raw socket and get net.Conn (创建原始Socket,得到net.Conn)
		connUdp, err := net.DialUDP("udp", nil, udpAddr)
		if err != nil {
			// Conection failed
			zlog.Ins().ErrorF("client connect to server failed, err:%v", err)
			u.ErrChan <- err
			return
		}
		u.conn = newClientConnUDP(u, *connUdp, u.Router)
		//u.conn = conn
		zlog.Ins().InfoF("[START] udp  Client LocalAddr: %s, RemoteAddr: %s\n", u.conn.LocalAddr(), u.conn.RemoteAddr())

		//go u.StartReader()

		// Start connection
		go u.conn.Start()

		select {
		case <-u.exitChan:
			zlog.Ins().InfoF("client exit.")
		}
	}()

	select {}
}


func (u *UdpClient ) Stop() {
	zlog.Ins().InfoF("[STOP] udp  Client LocalAddr: %s, RemoteAddr: %s\n", u.conn.LocalAddr(), u.conn.RemoteAddr())
	u.conn.Stop()
	u.exitChan <- struct{}{}
	close(u.exitChan)
	close(u.ErrChan)
}

func (u *UdpClient ) AddRouterFunc(router ziface.IRouter) {
	u.Router = router
	fmt.Println("Add Router Succ!!")
}

func (u *UdpClient ) GetMsgHandler() ziface.IMsgHandle {
	return u.msgHandler
}

func (u *UdpClient ) Conn() ziface.IConnection {
	return u.conn
}

func (u *UdpClient ) GetQueue() ziface.IQueue {
	return u.queue
}

func (u *UdpClient ) GetErrChan() chan error {
	return u.ErrChan
}

func (u *UdpClient ) SetName(name string) {
	u.Name = name
}

func (u *UdpClient ) GetName() string {
	return u.Name
}

func (u *UdpClient ) SetConnID(connID uint64) {
	u.connID = connID
}

func (u *UdpClient ) GetConnID() uint64 {
	return u.connID
}

接口?iudpclient.go

package ziface

type IUdpClient interface {
	Start()
	Stop()

	Conn() IConnection

	//Send(data []byte) error

	// Get the error channel for this Client 获取客户端错误管道
	GetErrChan() chan error

	// Set the name of this Clien
	// 设置客户端Client名称
	SetName(string)

	// Get the name of this Client
	// 获取客户端Client名称
	GetName() string
	//路由功能:给当前的服务注册一个路由方法,供客户端的链接处理使用
	AddRouterFunc(IRouter)
	GetMsgHandler() IMsgHandle
	// 设置客户端Client ID
	SetConnID(uint64)
	// 获取客户端Client ID
	GetConnID() uint64

	GetQueue() IQueue
}

完成后我们再来修改 Connection.go

因为tcp 和udp 都是net库来实现,但是tcp返回net.Conn,udp 返回的是net.UDPConn,因此我们将conn定义为interface。

新增newClientConnUDP

func newClientConnUDP(client ziface.IUdpConnect, conn net.UDPConn, router ziface.IRouter) ziface.IConnection {
	c := &Connection{
		conn:        conn,
		connID:      client.GetConnID(), // client ignore
		isClosed:    false,
		msgBuffChan: nil,
		property:    nil,
		name:        client.GetName(),
		localAddr:   conn.LocalAddr().String(),
		remoteAddr:  conn.RemoteAddr().String(),
		Router:      router,
		queue:       NewQueue(),
	}
	c.msgHandler = client.GetMsgHandler()
	return c
}

StartWriter 修改

func (c *Connection) StartWriter() {
	zlog.Ins().InfoF("Writer Goroutine is running")
	defer zlog.Ins().InfoF("%s [conn Writer exit!]", c.RemoteAddr().String())

	for {
		select {
		case data, ok := <-c.msgBuffChan:
			if ok {
				switch conn := c.conn.(type) {
				case net.Conn:
					if _, err := conn.Write(data); err != nil {
						zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err)
						break
					}
				case net.UDPConn:
					if _, err := conn.Write(data); err != nil {
						zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err)
						break
					}
				default:
					fmt.Println("未知类型的连接")
				}
			} else {
				zlog.Ins().ErrorF("msgBuffChan is Closed")
				break
			}
		case <-c.ctx.Done():
			return
		}
	}
}

StartReader修改

func (c *Connection) StartReader() {
	zlog.Ins().InfoF("[Reader Goroutine is running]")
	defer zlog.Ins().InfoF("%s [conn Reader exit!]", c.RemoteAddr().String())
	defer c.Stop()
	defer func() {
		if err := recover(); err != nil {
			zlog.Ins().ErrorF("connID=%d, panic err=%v", c.GetConnID(), err)
		}
	}()

	for {
		select {
		case <-c.ctx.Done():
			return
		default:
			buffer := make([]byte, zconf.GlobalObject.IOReadBuffSize)

			// read data from the connection's IO into the memory buffer
			// (从conn的IO中读取数据到内存缓冲buffer中)
			var n int
			var err error
			switch conn := c.conn.(type) {
			case net.Conn:
				n, err = conn.Read(buffer)
				if err != nil {
					zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err)
					return
				}
			case net.UDPConn:
				n, err = conn.Read(buffer)
				if err != nil {
					zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err)
					return
				}
			default:
				fmt.Println("未知类型的连接")
			}
			//fmt.Printf("Received %s data: %v  ",c.conn.RemoteAddr(), string(buffer[0:n]))
			fmt.Printf("Received %v  data: %v  ", string(buffer[0:n]), len(buffer[0:n]))

			//得到当前conn数据的Request请求数据
			//fmt.Println(" Received :%v  \n", hex.EncodeToString(buffer[0:n]))
			//得到当前conn数据的Request请求数据
			req := Request{
				conn: c,
				data: buffer[0:n],
			}

			//从路由中,找到注册绑定的Conn对应的router调用
			//执行注册的路由方法
			go func(request ziface.IRequest) {
				c.Router.PreHandle(request)
			}(&req)

			zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n]))

			// If normal data is read from the peer, update the heartbeat detection Active state
			// (正常读取到对端数据,更新心跳检测Active状态)
			if n > 0 && c.hc != nil {
				c.updateActivity()
			}


		}
	}
}

其他地方均可以参考这样来修改

	switch conn := c.conn.(type) {
	case net.Conn:
		return conn.RemoteAddr()
	case net.UDPConn:
		return conn.RemoteAddr()
	default:
		fmt.Println("未知类型的连接")
	}

测试

func clientUdp() {
	client := znet.NewUdpConnect("Test", "127.0.0.1", 10003)
	client.SetConnID(123456789)
	client.AddRouterFunc(&PingRouter{})
	go client.Start()
	go send(client)


}

func send(client ziface.IUdpConnect) {
	for {
		time.Sleep(10 * time.Second)
		conn := client.Conn()
		conn.Send([]byte("strCommand"))
	}
}

可以调用绑定的自定义路由。

本文中的路由是改过的,可以不用绑定msgID 。

文章来源:https://blog.csdn.net/windows_oracle/article/details/135231144
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。