252 lines
8.4 KiB
Python
Raw Normal View History

import _modbus
2022-09-19 00:48:04 +08:00
class ModBus(_modbus._ModBus):
2023-03-02 00:35:14 +08:00
"""
A subclass of _modbus._ModBus that provides methods for serializing and sending modbus messages.
"""
2022-09-19 00:48:04 +08:00
2022-09-19 14:35:05 +08:00
def serializeWriteBits(self, addr: int, src: list) -> bytes:
2023-03-02 00:35:14 +08:00
"""Serialize a write multiple coils request.
Args:
addr (int): The starting address of the coils to be written.
src (list): A list of boolean values (0 or 1) to be written to the coils.
Returns:
bytes: The serialized message as a bytes object.
"""
2023-02-23 15:52:44 +08:00
lenth = super().serializeWriteBits(addr, len(src), bytes(src))
2022-09-19 13:27:20 +08:00
return self.sendBuff[0:lenth]
2022-09-19 14:35:05 +08:00
def serializeWriteRegisters(self, addr: int, src: list) -> bytes:
2023-03-02 00:35:14 +08:00
"""Serialize a write multiple registers request.
Args:
addr (int): The starting address of the registers to be written.
src (list): A list of integer values (0-65535) to be written to the registers.
Returns:
bytes: The serialized message as a bytes object.
"""
2022-09-19 13:27:20 +08:00
_src = bytes(2 * len(src))
for i in range(len(src)):
_src[2 * i] = src[i] % 256
_src[2 * i + 1] = src[i] // 256
2022-09-19 14:35:05 +08:00
lenth = super().serializeWriteRegisters(addr, len(src), _src)
2022-09-19 13:27:20 +08:00
return self.sendBuff[0:lenth]
def serializeReadBits(self, addr: int, nb: int) -> bytes:
2023-03-02 00:35:14 +08:00
"""Serialize a read coils request.
Args:
addr (int): The starting address of the coils to be read.
nb (int): The number of coils to be read.
Returns:
bytes: The serialized message as a bytes object.
"""
2022-09-19 13:27:20 +08:00
lenth = super().serializeReadBits(addr, nb)
return self.sendBuff[0:lenth]
def serializeReadInputBits(self, addr: int, nb: int) -> bytes:
2023-03-02 00:35:14 +08:00
"""Serialize a read discrete inputs request.
Args:
addr (int): The starting address of the discrete inputs to be read.
nb (int): The number of discrete inputs to be read.
Returns:
bytes: The serialized message as a bytes object.
"""
2022-09-19 13:27:20 +08:00
lenth = super().serializeReadInputBits(addr, nb)
return self.sendBuff[0:lenth]
def serializeReadRegisters(self, addr: int, nb: int) -> bytes:
2023-03-02 00:35:14 +08:00
"""Serialize a read holding registers request.
Args:
addr (int): The starting address of the holding registers to be read.
nb (int): The number of holding registers to be read.
Returns:
bytes: The serialized message as a bytes object.
"""
2022-09-19 13:27:20 +08:00
lenth = super().serializeReadRegisters(addr, nb)
return self.sendBuff[0:lenth]
def serializeReadInputRegisters(self, addr: int, nb: int) -> bytes:
2023-03-02 00:35:14 +08:00
"""Serialize a read input registers request.
Args:
addr (int): The starting address of the input registers to be read.
nb (int): The number of input registers to be read.
Returns:
bytes: The serialized message as a bytes object.
"""
2022-09-19 13:27:20 +08:00
lenth = super().serializeReadInputRegisters(addr, nb)
return self.sendBuff[0:lenth]
def serializeWriteBit(self, addr: int, status: int) -> bytes:
2023-03-02 00:35:14 +08:00
"""Serialize a write single coil request.
Args:
addr (int): The address of the coil to be written.
status (int): The value (0 or 1) to be written to the coil.
Returns:
bytes: The serialized message as a bytes object.
"""
2022-09-19 13:27:20 +08:00
lenth = super().serializeWriteBit(addr, status)
return self.sendBuff[0:lenth]
def serializeWriteRegister(self, addr: int, value: int) -> bytes:
2023-03-02 00:35:14 +08:00
"""Serialize a write single register request.
Args:
addr (int): The address of the register to be written.
value (int): The value (0-65535) to be written to the register.
Returns:
bytes: The serialized message as a bytes object.
"""
2022-09-19 13:27:20 +08:00
lenth = super().serializeWriteRegister(addr, value)
return self.sendBuff[0:lenth]
2022-09-19 00:48:04 +08:00
2022-09-19 13:28:44 +08:00
def serializeMaskWriteRegister(self,
addr: int,
andMask: int,
orMask: int) -> bytes:
2023-03-02 00:35:14 +08:00
"""Serialize a mask write register request.
Args:
addr (int): The address of the register to be modified.
andMask (int): The AND mask to be applied to the current value of the register.
orMask (int): The OR mask to be applied to the result of the AND operation.
Returns:
bytes: The serialized message as a bytes object.
"""
2022-09-19 13:27:20 +08:00
lenth = super().serializeMaskWriteRegister(addr, andMask, orMask)
return self.sendBuff[0:lenth]
def serializeReportSlaveId(self) -> int:
2023-03-02 00:35:14 +08:00
"""Serialize a report slave ID request.
Returns:
int: The length of the serialized message in bytes.
"""
2022-09-19 13:27:20 +08:00
lenth = super().serializeReportSlaveId()
return self.sendBuff[0:lenth]
def deserializeReadRegisters(self, msg: bytes) -> list:
2023-03-02 00:35:14 +08:00
"""Deserialize a read holding registers response.
Args:
msg (bytes): The received message as a bytes object.
Returns:
list: A list of integer values (0-65535) read from the registers.
"""
2022-09-19 13:27:20 +08:00
self.readBuff = msg
2022-09-19 14:32:01 +08:00
dest = super().deserializeReadRegisters(len(msg))
2023-04-22 13:29:10 +08:00
if dest is None:
return None
2022-09-19 00:48:04 +08:00
ret = []
for i in range(0, len(dest), 2):
2022-09-19 13:27:20 +08:00
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
2022-09-19 00:48:04 +08:00
return ret
2022-09-19 13:27:20 +08:00
def deserializeReadBits(self, msg: bytes) -> list:
2023-03-02 00:35:14 +08:00
"""Deserialize a read coils response.
Args:
msg (bytes): The received message as a bytes object.
Returns:
list: A list of boolean values (True or False) read from the coils.
"""
2022-09-19 13:27:20 +08:00
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadBits(length)
2023-04-22 13:29:10 +08:00
if dest is None:
return None
2022-09-19 13:27:20 +08:00
return list(dest)
def deserializeReadInputBits(self, msg: bytes) -> list:
2023-03-02 00:35:14 +08:00
"""Deserialize a read discrete inputs response.
Args:
msg (bytes): The received message as a bytes object.
Returns:
list: A list of boolean values (True or False) read from the discrete inputs.
"""
2022-09-19 13:27:20 +08:00
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputBits(length)
2023-04-22 13:29:10 +08:00
if dest is None:
return None
2022-09-19 13:27:20 +08:00
return list(dest)
def deserializeReadInputRegisters(self, msg: bytes) -> list:
2023-03-02 00:35:14 +08:00
"""Deserialize a read input registers response.
Args:
msg (bytes): The received message as a bytes object.
Returns:
list: A list of integer values (0-65535) read from the input registers.
"""
2022-09-19 13:27:20 +08:00
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputRegisters(length)
2023-04-22 13:29:10 +08:00
if dest is None:
return None
2022-09-19 00:48:04 +08:00
ret = []
for i in range(0, len(dest), 2):
2022-09-19 13:27:20 +08:00
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
2022-09-19 00:48:04 +08:00
return ret
2022-09-19 13:27:20 +08:00
def deserializeWriteAndReadRegisters(self, msg: bytes) -> list:
2023-03-02 00:35:14 +08:00
"""Deserialize a write and read registers response.
Args:
msg (bytes): The received message as a bytes object.
Returns:
list: A list of integer values (0-65535) written to and read from the registers.
"""
2022-09-19 13:27:20 +08:00
self.readBuff = msg
length = len(msg)
dest = super().deserializeWriteAndReadRegisters(length)
2023-04-22 13:29:10 +08:00
if dest is None:
return None
2022-09-19 00:48:04 +08:00
ret = []
for i in range(0, len(dest), 2):
2022-09-19 13:27:20 +08:00
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
2022-09-19 00:48:04 +08:00
return ret
class ModBusRTU(ModBus):
def __init__(self, sendBuffSize: int, readBuffSize: int):
2023-03-02 00:35:14 +08:00
"""Initialize a Modbus RTU protocol instance.
Args:
sendBuffSize (int): The size of the send buffer in bytes.
readBuffSize (int): The size of the read buffer in bytes.
"""
2022-09-19 00:48:04 +08:00
self.__init__rtu(sendBuffSize, readBuffSize)
class ModBusTCP(ModBus):
def __init__(self, sendBuffSize: int, readBuffSize: int):
2023-03-02 00:35:14 +08:00
"""Initialize a Modbus TCP protocol instance.
Args:
sendBuffSize (int): The size of the send buffer in bytes.
readBuffSize (int): The size of the read buffer in bytes.
"""
2022-09-19 00:48:04 +08:00
self.__init__tcp(sendBuffSize, readBuffSize)