python 二次封装Modbus库实现设备间Modbus通信

发布时间:2024年01月16日

前言:

想想好久没有更博客了,今天忙中偷闲准备写一篇Modbus相关的分享。在进入工业领域之前我一直从事软件行业的测试工作,所以也没听过Modbus协议,但是自从进入西门子工作后正式进入了工业领域,所以对Modbus协议也就不陌生了。

在python中其实已经有了一个pymodbus的第三方模块了,该模块支持对Modbus通讯协议的读写操作,但是在实际使用过程中我们需要在robotframework中构建自动化测试流程,这时原有模块中的函数用起来不是那么顺手,所以就需要对pymodbus模块进行二次封装,封装成ModbusRobot库。

何为Modbus协议?

Modbus协议是什么?对于Modbus协议我特意找了几篇文章:?

文章1:?Modbus协议_海底001的博客-CSDN博客

文章2:?详解Modbus通信协议---清晰易懂_Z小旋的博客-CSDN博客_modbus通讯协议?

我觉得已经有人介绍的很清晰了,这里就不过多赘述了哈。

关于封装的介绍

1.建立/断开Modbus连接

	def open_modbus_connection(self, connectionType, **connectParams):
        # 建立modbus连接
		connectionType = ModbusConnectionType[connectionType] if isinstance(connectionType, str) else ModbusConnectionType(connectionType)
		clientID = None

		connectionClass = None
		connectParams.update({
			'timeout': float(connectParams.get('timeout', Defaults.Timeout)),
		})

		if 'unit' not in connectParams:
			unit = int(connectParams.pop('unit', 1))
		else:
			unit = int(connectParams.get('unit'))

		if connectionType in [ModbusConnectionType.TCP, ModbusConnectionType.UDP]:
			connectionTypeMapping = {
				ModbusConnectionType.TCP: {
					'connectionClass': ModbusTcpClient,
					'protocol': 'tcp',
				},
				ModbusConnectionType.UDP: {
					'connectionClass': ModbusUdpClient,
					'protocol': 'udp',
				},
			}

			if 'host' not in connectParams:
				raise ValueError('No host address given for Modbus {:s} connection'.format(connectionType.name))

			connectionClass = connectionTypeMapping[connectionType]['connectionClass']
			connectParams.update({
				'host': connectParams.get('host'),
				'port': int(connectParams.get('port', Defaults.Port)),
			})
			clientID = hashlib.sha1('{:s}://{!s}:{:d}'.format(
				connectionTypeMapping[connectionType]['protocol'],
				connectParams.get('host'),
				connectParams.get('port'),
			).encode('utf-8')).hexdigest()
		elif connectionType == ModbusConnectionType.RTU:
			if 'port' not in connectParams:
				raise ValueError('No COM port given for Modbus {:s} connection'.format(connectionType.name))

			connectionClass = ModbusSerialClient
			connectParams.update({
				'method': 'rtu',
				'port': connectParams.get('port'),
				'baudrate': int(connectParams.get('baudRate', 19200)),
				'parity': connectParams.get('parity', 'E'),
				'stopbits': int(connectParams.get('stopbits', Defaults.Stopbits)),
				'bytesize': int(connectParams.get('bytesize', Defaults.Bytesize)),
			})
			clientID = hashlib.sha1('{:s}'.format(connectParams.get('port')).encode('utf-8')).hexdigest()

		connectionID = '{:s}-{:d}'.format(clientID, unit)

		if clientID not in self._clients:
			self._clients[clientID] = {
				'client': connectionClass(**connectParams),
				'counter': 0,
			}

		if connectionID not in self._connections:
			self._connections[connectionID] = {
				'clientID': clientID,
				'unit': unit,
				'type': connectionType,
			}
			self._clients[clientID]['counter'] += 1

		self._active_connection = connectionID

		return connectionID
	def close_modbus_connection(self, connectionID=None):
        # 断开Modbus连接
		connectionID = self._check_connectionID(connectionID)

		self._clients[self._connections[connectionID]['clientID']]['counter'] -= 1

		if self._clients[self._connections[connectionID]['clientID']]['counter'] <= 0:
			self._clients[self._connections[connectionID]['clientID']]['client'].close()

			del self._clients[self._connections[connectionID]['clientID']]['client']
			del self._clients[self._connections[connectionID]['clientID']]

		del self._connections[connectionID]

		self._active_connection = None if not self._connections else next(iter(self._connections.keys()))

		return self._active_connection

2.根据数据类型读取Modbus数据

	def read_holding_string(self, address, maxLength, connectionID=None):
		connectionID = self._check_connectionID(connectionID)
		response = self._clients[self._connections[connectionID]['clientID']]['client'].read_holding_registers(int(address), int(maxLength / 2), unit=self._connections[connectionID]['unit']) # type: ReadHoldingRegistersResponse

		self._check_respose(response)

		s = struct.pack('!{:s}'.format('H' * len(response.registers)), *response.registers)
		i = -1

		try:
			i = s.index(b'\x00')

			return s[:i].decode('ascii')
		except ValueError:
			return s.decode('ascii')

	def read_holding_float(self, address, connectionID=None):
		connectionID = self._check_connectionID(connectionID)
		response = self._clients[self._connections[connectionID]['clientID']]['client'].read_holding_registers(int(address), 2, unit=self._connections[connectionID]['unit']) # type: ReadHoldingRegistersResponse

		self._check_respose(response)

		return struct.unpack('!f', struct.pack('!HH', *response.registers))[0]

	def read_holding_double(self, address, connectionID=None):
		connectionID = self._check_connectionID(connectionID)
		response = self._clients[self._connections[connectionID]['clientID']]['client'].read_holding_registers(int(address), 4, unit=self._connections[connectionID]['unit']) # type: ReadHoldingRegistersResponse

		self._check_respose(response)

		return struct.unpack('!d', struct.pack('!HHHH', *response.registers))[0]

3.通过Modbus向设备寄存器写数据

	def write_holding_registers(self, address, values, connectionID=None):
		connectionID = self._check_connectionID(connectionID)
		response = self._clients[self._connections[connectionID]['clientID']]['client'].write_registers(int(address), values, unit=self._connections[connectionID]['unit'])

		self._check_respose(response)

		return response

二次封装的库发布

封装完成可以打包成一个可以发布的三方库,结构如下:

下图是结构的介绍,关于打包的方法参考:python自定义包的发布与安装_python自定义包的安装与发布-CSDN博客

?总结

封装完成后可以作为一个单独的三方库,供开发其他功能调用。需要详细的源码可以私信我!

文章来源:https://blog.csdn.net/xgh1951/article/details/127324263
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。