想想好久没有更博客了,今天忙中偷闲准备写一篇Modbus相关的分享。在进入工业领域之前我一直从事软件行业的测试工作,所以也没听过Modbus协议,但是自从进入西门子工作后正式进入了工业领域,所以对Modbus协议也就不陌生了。
在python中其实已经有了一个pymodbus的第三方模块了,该模块支持对Modbus通讯协议的读写操作,但是在实际使用过程中我们需要在robotframework中构建自动化测试流程,这时原有模块中的函数用起来不是那么顺手,所以就需要对pymodbus模块进行二次封装,封装成ModbusRobot库。
Modbus协议是什么?对于Modbus协议我特意找了几篇文章:?
文章2:?详解Modbus通信协议---清晰易懂_Z小旋的博客-CSDN博客_modbus通讯协议?
我觉得已经有人介绍的很清晰了,这里就不过多赘述了哈。
def open_modbus_connection(self, connectionType, **connectParams):
# 建立modbus连接
connectionType = ModbusConnectionType[connectionType] if isinstance(connectionType, str) else ModbusConnectionType(connectionType)
clientID = None
connectionClass = None
connectParams.update({
'timeout': float(connectParams.get('timeout', Defaults.Timeout)),
})
if 'unit' not in connectParams:
unit = int(connectParams.pop('unit', 1))
else:
unit = int(connectParams.get('unit'))
if connectionType in [ModbusConnectionType.TCP, ModbusConnectionType.UDP]:
connectionTypeMapping = {
ModbusConnectionType.TCP: {
'connectionClass': ModbusTcpClient,
'protocol': 'tcp',
},
ModbusConnectionType.UDP: {
'connectionClass': ModbusUdpClient,
'protocol': 'udp',
},
}
if 'host' not in connectParams:
raise ValueError('No host address given for Modbus {:s} connection'.format(connectionType.name))
connectionClass = connectionTypeMapping[connectionType]['connectionClass']
connectParams.update({
'host': connectParams.get('host'),
'port': int(connectParams.get('port', Defaults.Port)),
})
clientID = hashlib.sha1('{:s}://{!s}:{:d}'.format(
connectionTypeMapping[connectionType]['protocol'],
connectParams.get('host'),
connectParams.get('port'),
).encode('utf-8')).hexdigest()
elif connectionType == ModbusConnectionType.RTU:
if 'port' not in connectParams:
raise ValueError('No COM port given for Modbus {:s} connection'.format(connectionType.name))
connectionClass = ModbusSerialClient
connectParams.update({
'method': 'rtu',
'port': connectParams.get('port'),
'baudrate': int(connectParams.get('baudRate', 19200)),
'parity': connectParams.get('parity', 'E'),
'stopbits': int(connectParams.get('stopbits', Defaults.Stopbits)),
'bytesize': int(connectParams.get('bytesize', Defaults.Bytesize)),
})
clientID = hashlib.sha1('{:s}'.format(connectParams.get('port')).encode('utf-8')).hexdigest()
connectionID = '{:s}-{:d}'.format(clientID, unit)
if clientID not in self._clients:
self._clients[clientID] = {
'client': connectionClass(**connectParams),
'counter': 0,
}
if connectionID not in self._connections:
self._connections[connectionID] = {
'clientID': clientID,
'unit': unit,
'type': connectionType,
}
self._clients[clientID]['counter'] += 1
self._active_connection = connectionID
return connectionID
def close_modbus_connection(self, connectionID=None):
# 断开Modbus连接
connectionID = self._check_connectionID(connectionID)
self._clients[self._connections[connectionID]['clientID']]['counter'] -= 1
if self._clients[self._connections[connectionID]['clientID']]['counter'] <= 0:
self._clients[self._connections[connectionID]['clientID']]['client'].close()
del self._clients[self._connections[connectionID]['clientID']]['client']
del self._clients[self._connections[connectionID]['clientID']]
del self._connections[connectionID]
self._active_connection = None if not self._connections else next(iter(self._connections.keys()))
return self._active_connection
def read_holding_string(self, address, maxLength, connectionID=None):
connectionID = self._check_connectionID(connectionID)
response = self._clients[self._connections[connectionID]['clientID']]['client'].read_holding_registers(int(address), int(maxLength / 2), unit=self._connections[connectionID]['unit']) # type: ReadHoldingRegistersResponse
self._check_respose(response)
s = struct.pack('!{:s}'.format('H' * len(response.registers)), *response.registers)
i = -1
try:
i = s.index(b'\x00')
return s[:i].decode('ascii')
except ValueError:
return s.decode('ascii')
def read_holding_float(self, address, connectionID=None):
connectionID = self._check_connectionID(connectionID)
response = self._clients[self._connections[connectionID]['clientID']]['client'].read_holding_registers(int(address), 2, unit=self._connections[connectionID]['unit']) # type: ReadHoldingRegistersResponse
self._check_respose(response)
return struct.unpack('!f', struct.pack('!HH', *response.registers))[0]
def read_holding_double(self, address, connectionID=None):
connectionID = self._check_connectionID(connectionID)
response = self._clients[self._connections[connectionID]['clientID']]['client'].read_holding_registers(int(address), 4, unit=self._connections[connectionID]['unit']) # type: ReadHoldingRegistersResponse
self._check_respose(response)
return struct.unpack('!d', struct.pack('!HHHH', *response.registers))[0]
def write_holding_registers(self, address, values, connectionID=None):
connectionID = self._check_connectionID(connectionID)
response = self._clients[self._connections[connectionID]['clientID']]['client'].write_registers(int(address), values, unit=self._connections[connectionID]['unit'])
self._check_respose(response)
return response
封装完成可以打包成一个可以发布的三方库,结构如下:
下图是结构的介绍,关于打包的方法参考:python自定义包的发布与安装_python自定义包的安装与发布-CSDN博客
封装完成后可以作为一个单独的三方库,供开发其他功能调用。需要详细的源码可以私信我!