Zinx 没有自己的UDP ,官网例子是通过Zinx Kcp 来实现UDP 通信。
我们实现UDP 也是通过标准库net 来实现。 因为需要UDP TCP WebSocket 等,因为Zinx TCP和WebSocket 已经实现,只有UDP没有。所以本文讲述如何通过在Zinx框架中加入UDP 。
实现UDP 以Zinx的方式实现。
udpclient.go?
package znet
import (
"fmt"
"net"
"sync"
"ziface"
"zlog"
)
type UdpClient struct {
// udp Name 连接的名称
Name string
// IP of the target server to connect 目标链接服务器的IP
Ip string
// Port of the target server to connect 目标链接服务器的端口
Port int
// Asynchronous channel for capturing connection close status 异步捕获链接关闭状态
exitChan chan struct{}
// Message management module 消息管理模块
msgHandler ziface.IMsgHandle
// Error channel
ErrChan chan error
//当前的Server添加一个router,server注册的链接对应的处理业务
Router ziface.IRouter
// (用户收发消息的Lock)
msgLock sync.RWMutex
//设备ID
connID uint64
// Connection instance 链接实例
conn ziface.IConnection
queue ziface.IQueue
}
func NewUdpClient (name string, ip string, port int) ziface.UdpClient {
u := &UdpConnect{
Name: name,
Ip: ip,
Port: port,
msgHandler: newMsgHandle(),
ErrChan: make(chan error),
queue: NewQueue(),
}
return u
}
func (u *UdpClient ) Start() {
fmt.Printf("UdpConnect start \n")
u.exitChan = make(chan struct{})
go func() {
// Get a Udp address
udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", u.Ip, u.Port))
if err != nil {
zlog.Ins().ErrorF("[START] resolve udp addr err: %v\n", err)
u.ErrChan <- err
return
}
// Create a raw socket and get net.Conn (创建原始Socket,得到net.Conn)
connUdp, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
// Conection failed
zlog.Ins().ErrorF("client connect to server failed, err:%v", err)
u.ErrChan <- err
return
}
u.conn = newClientConnUDP(u, *connUdp, u.Router)
//u.conn = conn
zlog.Ins().InfoF("[START] udp Client LocalAddr: %s, RemoteAddr: %s\n", u.conn.LocalAddr(), u.conn.RemoteAddr())
//go u.StartReader()
// Start connection
go u.conn.Start()
select {
case <-u.exitChan:
zlog.Ins().InfoF("client exit.")
}
}()
select {}
}
func (u *UdpClient ) Stop() {
zlog.Ins().InfoF("[STOP] udp Client LocalAddr: %s, RemoteAddr: %s\n", u.conn.LocalAddr(), u.conn.RemoteAddr())
u.conn.Stop()
u.exitChan <- struct{}{}
close(u.exitChan)
close(u.ErrChan)
}
func (u *UdpClient ) AddRouterFunc(router ziface.IRouter) {
u.Router = router
fmt.Println("Add Router Succ!!")
}
func (u *UdpClient ) GetMsgHandler() ziface.IMsgHandle {
return u.msgHandler
}
func (u *UdpClient ) Conn() ziface.IConnection {
return u.conn
}
func (u *UdpClient ) GetQueue() ziface.IQueue {
return u.queue
}
func (u *UdpClient ) GetErrChan() chan error {
return u.ErrChan
}
func (u *UdpClient ) SetName(name string) {
u.Name = name
}
func (u *UdpClient ) GetName() string {
return u.Name
}
func (u *UdpClient ) SetConnID(connID uint64) {
u.connID = connID
}
func (u *UdpClient ) GetConnID() uint64 {
return u.connID
}
接口?iudpclient.go
package ziface
type IUdpClient interface {
Start()
Stop()
Conn() IConnection
//Send(data []byte) error
// Get the error channel for this Client 获取客户端错误管道
GetErrChan() chan error
// Set the name of this Clien
// 设置客户端Client名称
SetName(string)
// Get the name of this Client
// 获取客户端Client名称
GetName() string
//路由功能:给当前的服务注册一个路由方法,供客户端的链接处理使用
AddRouterFunc(IRouter)
GetMsgHandler() IMsgHandle
// 设置客户端Client ID
SetConnID(uint64)
// 获取客户端Client ID
GetConnID() uint64
GetQueue() IQueue
}
完成后我们再来修改 Connection.go
因为tcp 和udp 都是net库来实现,但是tcp返回net.Conn,udp 返回的是net.UDPConn,因此我们将conn定义为interface。
新增newClientConnUDP
func newClientConnUDP(client ziface.IUdpConnect, conn net.UDPConn, router ziface.IRouter) ziface.IConnection {
c := &Connection{
conn: conn,
connID: client.GetConnID(), // client ignore
isClosed: false,
msgBuffChan: nil,
property: nil,
name: client.GetName(),
localAddr: conn.LocalAddr().String(),
remoteAddr: conn.RemoteAddr().String(),
Router: router,
queue: NewQueue(),
}
c.msgHandler = client.GetMsgHandler()
return c
}
StartWriter 修改
func (c *Connection) StartWriter() {
zlog.Ins().InfoF("Writer Goroutine is running")
defer zlog.Ins().InfoF("%s [conn Writer exit!]", c.RemoteAddr().String())
for {
select {
case data, ok := <-c.msgBuffChan:
if ok {
switch conn := c.conn.(type) {
case net.Conn:
if _, err := conn.Write(data); err != nil {
zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err)
break
}
case net.UDPConn:
if _, err := conn.Write(data); err != nil {
zlog.Ins().ErrorF("Send Buff Data error:, %s Conn Writer exit", err)
break
}
default:
fmt.Println("未知类型的连接")
}
} else {
zlog.Ins().ErrorF("msgBuffChan is Closed")
break
}
case <-c.ctx.Done():
return
}
}
}
StartReader修改
func (c *Connection) StartReader() {
zlog.Ins().InfoF("[Reader Goroutine is running]")
defer zlog.Ins().InfoF("%s [conn Reader exit!]", c.RemoteAddr().String())
defer c.Stop()
defer func() {
if err := recover(); err != nil {
zlog.Ins().ErrorF("connID=%d, panic err=%v", c.GetConnID(), err)
}
}()
for {
select {
case <-c.ctx.Done():
return
default:
buffer := make([]byte, zconf.GlobalObject.IOReadBuffSize)
// read data from the connection's IO into the memory buffer
// (从conn的IO中读取数据到内存缓冲buffer中)
var n int
var err error
switch conn := c.conn.(type) {
case net.Conn:
n, err = conn.Read(buffer)
if err != nil {
zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err)
return
}
case net.UDPConn:
n, err = conn.Read(buffer)
if err != nil {
zlog.Ins().ErrorF("read msg head [read datalen=%d], error = %s", n, err)
return
}
default:
fmt.Println("未知类型的连接")
}
//fmt.Printf("Received %s data: %v ",c.conn.RemoteAddr(), string(buffer[0:n]))
fmt.Printf("Received %v data: %v ", string(buffer[0:n]), len(buffer[0:n]))
//得到当前conn数据的Request请求数据
//fmt.Println(" Received :%v \n", hex.EncodeToString(buffer[0:n]))
//得到当前conn数据的Request请求数据
req := Request{
conn: c,
data: buffer[0:n],
}
//从路由中,找到注册绑定的Conn对应的router调用
//执行注册的路由方法
go func(request ziface.IRequest) {
c.Router.PreHandle(request)
}(&req)
zlog.Ins().DebugF("read buffer %s \n", hex.EncodeToString(buffer[0:n]))
// If normal data is read from the peer, update the heartbeat detection Active state
// (正常读取到对端数据,更新心跳检测Active状态)
if n > 0 && c.hc != nil {
c.updateActivity()
}
}
}
}
其他地方均可以参考这样来修改
switch conn := c.conn.(type) {
case net.Conn:
return conn.RemoteAddr()
case net.UDPConn:
return conn.RemoteAddr()
default:
fmt.Println("未知类型的连接")
}
测试
func clientUdp() {
client := znet.NewUdpConnect("Test", "127.0.0.1", 10003)
client.SetConnID(123456789)
client.AddRouterFunc(&PingRouter{})
go client.Start()
go send(client)
}
func send(client ziface.IUdpConnect) {
for {
time.Sleep(10 * time.Second)
conn := client.Conn()
conn.Send([]byte("strCommand"))
}
}
可以调用绑定的自定义路由。
本文中的路由是改过的,可以不用绑定msgID 。