在 Golang 语言标准库之中提供了,对于TCP/IP链接、侦听器的高级封装支持,这易于上层开发人员轻松基于这些BCL(基础类库)实现期望的功能。
TCP/IP链接(客户端)
net.Conn 接口
TCP/IP侦听器(服务器)
net.Listener
Golang 提供了易用的写入数据到远程(对端)实现,而不比像 C/C++?这类传统的编程语言,人们需要自行处理发送的字节数。
例如:
原生:send、WSASend、WSPSend 等函数
ASIO:stream::async_write_some? ? 等函数
它与 Microsoft .NET 提供的 System.Net.Socket 类的发送函数功能是类似的,调用该函数发送数据,它会确保数据全部发送到对端(远端),否则视为失败。
在实际生产环境之中,绝大多数的场景上面,人们的确不需要调用一次发送函数,但不保证本次期望传送数据全部发送成功,而是潜在的可能只发送一部分,还需要开发人员自行处理,这样繁琐的TCP网络程序发送实现的。
但这在一些特定场景的网络程序上面是有意义的,例如我们需要知道已用掉了多少的流量,因为这一次缓冲区发送并没有全部传送到远端,但已经传送了一部分也生产了网络带宽资源的浪费,所以,像这种问题,Golang 不提供类似接口,它这块的不自由,是会有一些问题的。
较为庆幸的是:
net.Conn 接口提供的?Read 函数并非是保证一定读入期望BUF大小的,否则这个在很多类型的网络程序上面就很坑人了。
它就相当于传统阻塞的?recv,不会出现非阻塞的EAGIN要求开发人员重试的操作的问题,所有它只有返回已收到的字节数,或发生错误。
当然人们仍需处理一个特殊的情况,recv 可能返回FIN 0字节,但并非错误,这是因为对端正确的关闭了TCP链接时产生的。
但遇到类似这类型的场景还是用 C/C++、或者CGO调用原生C API来实现把,功能上面都可以解决,只是用GO语言整会很麻烦就是了。
本文提供一个简单的网络传输协议,适用四个字节来表示长度,一个字节来表示关键帧字,不考虑对于流的效验合(checksum)的计算及验证,人们若有需求可以自行修改,在大多数的TCP应用协议服务器上面,它都可以经过少量修改集成到解决方案之中。(Go 语言之中或许该称为集成到 Package 程序包之中)
四个字节长度,可以描述到一帧最大?INT32_MAX(2147483647)字节封装传送,其实绝大多数情况传递大包是没有太大意义的,人们可以自行评估调整。
值得一提,在绝大多数的场景之中,如若产生大包,三个字节来表示长度,人们自行位运算即可,这是因为过大的帧长,可能会导致网络程序在接受这些大数据帧时,产生严重的内存恐慌问题。
个人一个好的建议是,对于追求网络吞吐性能的TCP应用协议,人们在适用 Golang 应该直接废弃掉,没有任何意义的各种接口及封装实现,如返回??io.Reader,并且应当适用固定缓冲区的最大帧对齐,如:4K,即用户不要发送超过最大对齐(4K)的单帧报文。
随机内存分配会导致碎片化的产生,影响网络程序的吞吐能力,同时频繁的内存复制也会导致内存、及CPU计算资源负载升高。
但在大多数场景的网络程序来说,并不需要在意这块的优化,因为没有太大意义,但对于纯网络IO密集型应用来说,这是有很大必要的。
本文提供的实现不适用上述场景,但可以适用于略微带一些大包处理(即用户不愿意在业务层分片、组片的场景),但本人更希望大家趋近于共同学习目的。
运行测试:
go run -race test.go
服务器及客户端实现及封装:(含测试用例)
main.go
package main
import (
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"math/rand"
"net"
"strconv"
"sync"
"time"
)
type _ConnectionReader struct {
owner *Connection
length int
offset int
checksum uint32
header_recv []byte
lock_recv sync.Mutex
}
type Connection struct {
disposed bool
connection net.Conn
header_send []byte
lock_sent sync.Mutex
reader *_ConnectionReader
listener *Listener
}
type Listener struct {
sync.Mutex
disposed bool
listener net.Listener
connections map[*Connection]bool
}
/*
#pragma pack(push, 1)
typedef struct {
BYTE bKf; // 关键帧字
DWORD dwLength; // 载荷长度
} PACKET_HEADER;
#pragma pack(pop)
static constexpr int PACKET_HEADER_SIZE = sizeof(PACKET_HEADER); // 4 + 1 = 5 BYTE
*/
const (
_CONNECTION_PACKET_HEADER_KF = 0x2A // 置关键帧字
_CONNECTION_PACKET_HEADER_SIZE = 5
CONNECTION_MIN_PORT = 0
CONNECTION_MAX_PORT = math.MaxUint16
)
var ErrConnectionClosed = errors.New("connection has been closed")
var ErrConnectionArgP = errors.New("the parameter p cannot be incorrectly null or array length 0")
var ErrConnectionProtocolKf = errors.New("network protocol error, kf check error")
var ErrConnectionProtocolLength = errors.New("network protocol error, length check error")
var ErrConnectionArgAcceptor = errors.New("the acceptor parameter cannot be null")
var ErrConnectionDisconnect = errors.New("connection has been disconnect")
// 功能名:发送数据
// 返回值:
// < 0 发送错误(ERR)
// == 0 链接断开(FIN)
// > 0 已发送字节数
func (my *Connection) Send(buffer []byte, offset int, length int) int {
// 对于欲发送数据的参数检查
if buffer == nil || offset < 0 || length < 1 {
return -1
}
// 检查是否溢出BUFF缓存大小
len := len(buffer)
if offset+length > len {
return -1
}
// 检查链接是否存在
connection := my.connection
if connection == nil {
return -1
}
// 预备环境及变量
bytes_transferred := 0
sync := &my.lock_sent
header := my.header_send
payload := buffer[offset : offset+length]
// 如果可以直接获取到信号,否则其它协同程序就等待发送结束,不要用管道这些莫名其妙的东西。
sync.Lock()
defer sync.Unlock()
// 检查当前链接是否已经释放
if my.disposed {
return -1
}
// 先发送协议帧头
header[0] = _CONNECTION_PACKET_HEADER_KF
binary.BigEndian.PutUint32(header[1:], uint32(length))
written_size, err := connection.Write(header)
if err != nil {
return -1
} else {
bytes_transferred += written_size
}
// 在发送协议载荷
written_size, err = connection.Write(payload)
if err != nil {
return -1
}
// 加上已传送的字节数
bytes_transferred += written_size
return bytes_transferred
}
// 功能名:收取数据
// 上个 Reader 未完成之前一直阻塞当前协程直到对方结束后返回
func (my *Connection) Receive() io.Reader {
// 检查当前链接是否已经释放
if my.disposed {
return nil
}
// 检查链接是否存在
connection := my.connection
if connection == nil {
return nil
}
// 返回帧读入器
reader := my.reader
reader.lock_recv.Lock()
return reader
}
// 功能名:实例化一个链接对象
func NewConnection(conn net.Conn, listener *Listener) *Connection {
var connection *Connection
if conn != nil {
connection = &Connection{
disposed: false,
connection: conn,
listener: listener,
header_send: make([]byte, _CONNECTION_PACKET_HEADER_SIZE),
}
connection.reader = &_ConnectionReader{
owner: connection,
length: 0,
offset: 0,
checksum: 0,
header_recv: make([]byte, _CONNECTION_PACKET_HEADER_SIZE),
}
}
return connection
}
// 功能名:链接主机
func Connect(host string, port int) *Connection {
// 检查端口参数的有效性
if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT {
return nil
}
// 服务器主机地址不可为空
if len(host) < 1 {
return nil
}
// 服务器地址并且尝试链接
address := host + ":" + strconv.Itoa(port)
conn, err := net.Dial("tcp", address)
if err != nil {
return nil
}
// 返回TCP链接的封装对象
return NewConnection(conn, nil)
}
// 功能名:关闭链接(网络)
func (my *Connection) close(connection net.Conn) error {
// 强制关闭链接,但可能会失败
if my.disposed {
return nil
}
my.disposed = true
return connection.Close()
}
// 功能名:关闭链接
func (my *Connection) Close(await bool) (err error) {
// 检查链接是否存在
connection := my.connection
if connection == nil {
return
}
// 如果可以直接获取到信号,否则其它协同程序就等待发送结束,不要用管道这些莫名其妙的东西。
sync := &my.lock_sent
if await {
sync.Lock()
sync.Unlock()
// 检查当前链接是否已经释放
err = my.close(connection)
} else {
err = my.close(connection)
}
// 如果是服务器接受的链接对象,就从服务器列表之中删除这个链接实例。
listener := my.listener
if listener != nil {
listener.Lock()
delete(listener.connections, my)
listener.Unlock()
}
return
}
func (my *Connection) connection_get_ip_end_point(remote bool) string {
connection := my.connection
if connection == nil {
return ""
}
var address net.Addr
if remote {
address = connection.RemoteAddr()
} else {
address = connection.LocalAddr()
}
if address == nil {
return ""
}
return address.String()
}
// 功能名:获取远程地址
func (my *Connection) GetRemoteEndPoint() string {
return my.connection_get_ip_end_point(true)
}
// 功能名:获取本地地址
func (my *Connection) GetLocalEndPoint() string {
return my.connection_get_ip_end_point(false)
}
// 功能名:读入帧数据
func (my *_ConnectionReader) Read(p []byte) (n int, err error) {
// 检查当前链接是否已经释放
owner := my.owner
if owner.disposed {
return 0, ErrConnectionClosed
}
// 检查参数P不可以为NUL或数组长度为0
length := len(p)
if length < 1 {
return 0, ErrConnectionArgP
}
// 帧已经被全部收取完成
if my.length < 0 {
my.length = 0
my.lock_recv.Unlock()
return 0, io.EOF
}
// 收取协议报文的头部
if my.length == 0 {
header := my.header_recv
n, err := io.ReadFull(owner.connection, header)
if err != nil {
return n, err
}
// 判断协议关键帧字
kf := header[0]
if kf != _CONNECTION_PACKET_HEADER_KF {
return 0, ErrConnectionProtocolKf
}
// 检查载荷的总长度
my.length = int(binary.BigEndian.Uint32(header[1:]))
my.offset = 0
my.checksum = 0
if my.length < 1 {
return 0, ErrConnectionProtocolLength
}
}
// 循环收取数据到缓存区P之中
remain := my.length - my.offset
if length <= remain {
n, err = owner.connection.Read(p)
} else {
n, err = owner.connection.Read(p[:remain])
}
// 从链接之中读入数据出现错误
if err != nil {
return n, err
}
// 是否收取到FIN字节(0)
if n < 1 {
return n, ErrConnectionDisconnect
}
// 计算当前帧是否已经收取完毕
my.offset += n
if my.offset < my.length {
return n, nil
} else {
my.offset = 0
my.length = -1
my.checksum = 0
return n, nil
}
}
// 功能名:实例化一个侦听器
func NewListener(host string, port int) *Listener {
// 检查端口参数的有效性
if port <= CONNECTION_MIN_PORT || port > CONNECTION_MAX_PORT {
return nil
}
// 服务器主机地址不可为空
if len(host) < 1 {
return nil
}
// 服务器地址并且尝试绑定
address := host + ":" + strconv.Itoa(port)
listener, err := net.Listen("tcp", address)
if err != nil {
return nil
}
return &Listener{
disposed: false,
listener: listener,
connections: make(map[*Connection]bool),
}
}
// 功能名:侦听服务器
func (my *Listener) ListenAndServe(acceptor func(*Connection)) error {
// 接收器参数不可以为空
if acceptor == nil {
return ErrConnectionArgAcceptor
}
// 网络侦听器已经关闭
if my.disposed {
return ErrConnectionClosed
}
any := false
listener := my.listener
for {
// 网络如果已经被关闭了
if my.disposed {
return nil
}
// 尝试接收一个网络链接
conn, err := listener.Accept()
if err != nil {
if any {
return nil
} else {
return err
}
}
// 如果没有获取到链接的引用则迭代到下个链接接受
if conn == nil {
continue
}
// 构建一个封装的网络链接对象
connection := NewConnection(conn, my)
my.Lock()
my.connections[connection] = true
my.Unlock()
// 启动对于链接处理的协同程序
go acceptor(connection)
}
}
// 功能名:关闭全部链接
func (my *Listener) Close() {
// 强制关闭服务器的侦听器
listener := my.listener
if listener != nil {
listener.Close()
}
// 释放全部持有的托管资源
my.Lock()
my.disposed = true
connections := my.connections
my.connections = make(map[*Connection]bool)
my.Unlock()
// 强制关闭全部的网络链接
for connection := range connections {
connection.Close(false)
}
}
func test() {
rand.Seed(time.Now().UnixNano())
// 链接服务器
packet := 0
connection := Connect("127.0.0.1", 11111)
for i, c := 0, rand.Intn(100)+1; i < c; i++ {
length := rand.Intn(128) + 1
buffer := make([]byte, length)
for j := 0; j < length; j++ {
buffer[j] = byte(rand.Intn(26)) + 97
}
// 发送数据
transferred := connection.Send(buffer, 0, length)
if transferred < 1 {
break
} else {
// 接受数据
r := connection.Receive()
if r == nil {
break
}
// 读取全部数据(一帧)
buf, err := io.ReadAll(r)
if err != nil {
break
} else if len(buf) < 1 {
break
}
// 打印收到的帧数据
packet++
fmt.Printf("[%s]: client packet=%d length=%d string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), string(buf))
}
}
// 关闭链接
connection.Close(true)
// 客户端关闭网络链接
fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "client connection closed")
}
func main() {
// 运行客户端测试协程
go test()
// 打开服务器侦听器哟
listener := NewListener("127.0.0.1", 11111)
listener.ListenAndServe(func(c *Connection) {
packet := 0
remoteEP := c.GetRemoteEndPoint()
for {
// 获取网络接收器
r := c.Receive()
if r == nil {
break
}
// 读取全部数据(一帧)
buf, err := io.ReadAll(r)
if err != nil {
break
} else if len(buf) < 1 {
break
}
// 打印收到的帧数据
packet++
fmt.Printf("[%s]: server packet=%d length=%d remote=%s string:%s\r\n", time.Now().Format("2006-01-02 15:04:05"), packet, len(buf), remoteEP, string(buf))
// 回显客户端的数据
transferred := c.Send(buf, 0, len(buf))
if transferred < 1 {
break
}
}
// 关闭客户端链接
c.Close(true)
// 服务器关闭网络链接
fmt.Printf("[%s]: %s\r\n", time.Now().Format("2006-01-02 15:04:05"), "server connection closed")
})
}