mirror of
https://gitee.com/Lyon1998/pikapython.git
synced 2025-01-29 17:22:56 +08:00
add comment for modbus.py, from AI
This commit is contained in:
parent
57a04ca87e
commit
c01775b39a
@ -2,12 +2,33 @@ import _modbus
|
|||||||
|
|
||||||
|
|
||||||
class ModBus(_modbus._ModBus):
|
class ModBus(_modbus._ModBus):
|
||||||
|
"""
|
||||||
|
A subclass of _modbus._ModBus that provides methods for serializing and sending modbus messages.
|
||||||
|
"""
|
||||||
|
|
||||||
def serializeWriteBits(self, addr: int, src: list) -> bytes:
|
def serializeWriteBits(self, addr: int, src: list) -> bytes:
|
||||||
|
"""Serialize a write multiple coils request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the coils to be written.
|
||||||
|
src (list): A list of boolean values (0 or 1) to be written to the coils.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeWriteBits(addr, len(src), bytes(src))
|
lenth = super().serializeWriteBits(addr, len(src), bytes(src))
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeWriteRegisters(self, addr: int, src: list) -> bytes:
|
def serializeWriteRegisters(self, addr: int, src: list) -> bytes:
|
||||||
|
"""Serialize a write multiple registers request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the registers to be written.
|
||||||
|
src (list): A list of integer values (0-65535) to be written to the registers.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
_src = bytes(2 * len(src))
|
_src = bytes(2 * len(src))
|
||||||
for i in range(len(src)):
|
for i in range(len(src)):
|
||||||
_src[2 * i] = src[i] % 256
|
_src[2 * i] = src[i] % 256
|
||||||
@ -16,26 +37,80 @@ class ModBus(_modbus._ModBus):
|
|||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReadBits(self, addr: int, nb: int) -> bytes:
|
def serializeReadBits(self, addr: int, nb: int) -> bytes:
|
||||||
|
"""Serialize a read coils request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the coils to be read.
|
||||||
|
nb (int): The number of coils to be read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeReadBits(addr, nb)
|
lenth = super().serializeReadBits(addr, nb)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReadInputBits(self, addr: int, nb: int) -> bytes:
|
def serializeReadInputBits(self, addr: int, nb: int) -> bytes:
|
||||||
|
"""Serialize a read discrete inputs request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the discrete inputs to be read.
|
||||||
|
nb (int): The number of discrete inputs to be read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeReadInputBits(addr, nb)
|
lenth = super().serializeReadInputBits(addr, nb)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReadRegisters(self, addr: int, nb: int) -> bytes:
|
def serializeReadRegisters(self, addr: int, nb: int) -> bytes:
|
||||||
|
"""Serialize a read holding registers request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the holding registers to be read.
|
||||||
|
nb (int): The number of holding registers to be read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeReadRegisters(addr, nb)
|
lenth = super().serializeReadRegisters(addr, nb)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReadInputRegisters(self, addr: int, nb: int) -> bytes:
|
def serializeReadInputRegisters(self, addr: int, nb: int) -> bytes:
|
||||||
|
"""Serialize a read input registers request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the input registers to be read.
|
||||||
|
nb (int): The number of input registers to be read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeReadInputRegisters(addr, nb)
|
lenth = super().serializeReadInputRegisters(addr, nb)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeWriteBit(self, addr: int, status: int) -> bytes:
|
def serializeWriteBit(self, addr: int, status: int) -> bytes:
|
||||||
|
"""Serialize a write single coil request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The address of the coil to be written.
|
||||||
|
status (int): The value (0 or 1) to be written to the coil.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeWriteBit(addr, status)
|
lenth = super().serializeWriteBit(addr, status)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeWriteRegister(self, addr: int, value: int) -> bytes:
|
def serializeWriteRegister(self, addr: int, value: int) -> bytes:
|
||||||
|
"""Serialize a write single register request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The address of the register to be written.
|
||||||
|
value (int): The value (0-65535) to be written to the register.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeWriteRegister(addr, value)
|
lenth = super().serializeWriteRegister(addr, value)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
@ -43,14 +118,37 @@ class ModBus(_modbus._ModBus):
|
|||||||
addr: int,
|
addr: int,
|
||||||
andMask: int,
|
andMask: int,
|
||||||
orMask: int) -> bytes:
|
orMask: int) -> bytes:
|
||||||
|
"""Serialize a mask write register request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The address of the register to be modified.
|
||||||
|
andMask (int): The AND mask to be applied to the current value of the register.
|
||||||
|
orMask (int): The OR mask to be applied to the result of the AND operation.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeMaskWriteRegister(addr, andMask, orMask)
|
lenth = super().serializeMaskWriteRegister(addr, andMask, orMask)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReportSlaveId(self) -> int:
|
def serializeReportSlaveId(self) -> int:
|
||||||
|
"""Serialize a report slave ID request.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: The length of the serialized message in bytes.
|
||||||
|
"""
|
||||||
lenth = super().serializeReportSlaveId()
|
lenth = super().serializeReportSlaveId()
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def deserializeReadRegisters(self, msg: bytes) -> list:
|
def deserializeReadRegisters(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a read holding registers response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of integer values (0-65535) read from the registers.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
dest = super().deserializeReadRegisters(len(msg))
|
dest = super().deserializeReadRegisters(len(msg))
|
||||||
ret = []
|
ret = []
|
||||||
@ -59,18 +157,42 @@ class ModBus(_modbus._ModBus):
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
def deserializeReadBits(self, msg: bytes) -> list:
|
def deserializeReadBits(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a read coils response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of boolean values (True or False) read from the coils.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
length = len(msg)
|
length = len(msg)
|
||||||
dest = super().deserializeReadBits(length)
|
dest = super().deserializeReadBits(length)
|
||||||
return list(dest)
|
return list(dest)
|
||||||
|
|
||||||
def deserializeReadInputBits(self, msg: bytes) -> list:
|
def deserializeReadInputBits(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a read discrete inputs response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of boolean values (True or False) read from the discrete inputs.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
length = len(msg)
|
length = len(msg)
|
||||||
dest = super().deserializeReadInputBits(length)
|
dest = super().deserializeReadInputBits(length)
|
||||||
return list(dest)
|
return list(dest)
|
||||||
|
|
||||||
def deserializeReadInputRegisters(self, msg: bytes) -> list:
|
def deserializeReadInputRegisters(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a read input registers response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of integer values (0-65535) read from the input registers.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
length = len(msg)
|
length = len(msg)
|
||||||
dest = super().deserializeReadInputRegisters(length)
|
dest = super().deserializeReadInputRegisters(length)
|
||||||
@ -80,6 +202,14 @@ class ModBus(_modbus._ModBus):
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
def deserializeWriteAndReadRegisters(self, msg: bytes) -> list:
|
def deserializeWriteAndReadRegisters(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a write and read registers response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of integer values (0-65535) written to and read from the registers.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
length = len(msg)
|
length = len(msg)
|
||||||
dest = super().deserializeWriteAndReadRegisters(length)
|
dest = super().deserializeWriteAndReadRegisters(length)
|
||||||
@ -91,9 +221,21 @@ class ModBus(_modbus._ModBus):
|
|||||||
|
|
||||||
class ModBusRTU(ModBus):
|
class ModBusRTU(ModBus):
|
||||||
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
||||||
|
"""Initialize a Modbus RTU protocol instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sendBuffSize (int): The size of the send buffer in bytes.
|
||||||
|
readBuffSize (int): The size of the read buffer in bytes.
|
||||||
|
"""
|
||||||
self.__init__rtu(sendBuffSize, readBuffSize)
|
self.__init__rtu(sendBuffSize, readBuffSize)
|
||||||
|
|
||||||
|
|
||||||
class ModBusTCP(ModBus):
|
class ModBusTCP(ModBus):
|
||||||
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
||||||
|
"""Initialize a Modbus TCP protocol instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sendBuffSize (int): The size of the send buffer in bytes.
|
||||||
|
readBuffSize (int): The size of the read buffer in bytes.
|
||||||
|
"""
|
||||||
self.__init__tcp(sendBuffSize, readBuffSize)
|
self.__init__tcp(sendBuffSize, readBuffSize)
|
||||||
|
@ -2,12 +2,33 @@ import _modbus
|
|||||||
|
|
||||||
|
|
||||||
class ModBus(_modbus._ModBus):
|
class ModBus(_modbus._ModBus):
|
||||||
|
"""
|
||||||
|
A subclass of _modbus._ModBus that provides methods for serializing and sending modbus messages.
|
||||||
|
"""
|
||||||
|
|
||||||
def serializeWriteBits(self, addr: int, src: list) -> bytes:
|
def serializeWriteBits(self, addr: int, src: list) -> bytes:
|
||||||
|
"""Serialize a write multiple coils request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the coils to be written.
|
||||||
|
src (list): A list of boolean values (0 or 1) to be written to the coils.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeWriteBits(addr, len(src), bytes(src))
|
lenth = super().serializeWriteBits(addr, len(src), bytes(src))
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeWriteRegisters(self, addr: int, src: list) -> bytes:
|
def serializeWriteRegisters(self, addr: int, src: list) -> bytes:
|
||||||
|
"""Serialize a write multiple registers request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the registers to be written.
|
||||||
|
src (list): A list of integer values (0-65535) to be written to the registers.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
_src = bytes(2 * len(src))
|
_src = bytes(2 * len(src))
|
||||||
for i in range(len(src)):
|
for i in range(len(src)):
|
||||||
_src[2 * i] = src[i] % 256
|
_src[2 * i] = src[i] % 256
|
||||||
@ -16,26 +37,80 @@ class ModBus(_modbus._ModBus):
|
|||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReadBits(self, addr: int, nb: int) -> bytes:
|
def serializeReadBits(self, addr: int, nb: int) -> bytes:
|
||||||
|
"""Serialize a read coils request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the coils to be read.
|
||||||
|
nb (int): The number of coils to be read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeReadBits(addr, nb)
|
lenth = super().serializeReadBits(addr, nb)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReadInputBits(self, addr: int, nb: int) -> bytes:
|
def serializeReadInputBits(self, addr: int, nb: int) -> bytes:
|
||||||
|
"""Serialize a read discrete inputs request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the discrete inputs to be read.
|
||||||
|
nb (int): The number of discrete inputs to be read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeReadInputBits(addr, nb)
|
lenth = super().serializeReadInputBits(addr, nb)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReadRegisters(self, addr: int, nb: int) -> bytes:
|
def serializeReadRegisters(self, addr: int, nb: int) -> bytes:
|
||||||
|
"""Serialize a read holding registers request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the holding registers to be read.
|
||||||
|
nb (int): The number of holding registers to be read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeReadRegisters(addr, nb)
|
lenth = super().serializeReadRegisters(addr, nb)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReadInputRegisters(self, addr: int, nb: int) -> bytes:
|
def serializeReadInputRegisters(self, addr: int, nb: int) -> bytes:
|
||||||
|
"""Serialize a read input registers request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The starting address of the input registers to be read.
|
||||||
|
nb (int): The number of input registers to be read.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeReadInputRegisters(addr, nb)
|
lenth = super().serializeReadInputRegisters(addr, nb)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeWriteBit(self, addr: int, status: int) -> bytes:
|
def serializeWriteBit(self, addr: int, status: int) -> bytes:
|
||||||
|
"""Serialize a write single coil request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The address of the coil to be written.
|
||||||
|
status (int): The value (0 or 1) to be written to the coil.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeWriteBit(addr, status)
|
lenth = super().serializeWriteBit(addr, status)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeWriteRegister(self, addr: int, value: int) -> bytes:
|
def serializeWriteRegister(self, addr: int, value: int) -> bytes:
|
||||||
|
"""Serialize a write single register request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The address of the register to be written.
|
||||||
|
value (int): The value (0-65535) to be written to the register.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeWriteRegister(addr, value)
|
lenth = super().serializeWriteRegister(addr, value)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
@ -43,14 +118,37 @@ class ModBus(_modbus._ModBus):
|
|||||||
addr: int,
|
addr: int,
|
||||||
andMask: int,
|
andMask: int,
|
||||||
orMask: int) -> bytes:
|
orMask: int) -> bytes:
|
||||||
|
"""Serialize a mask write register request.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
addr (int): The address of the register to be modified.
|
||||||
|
andMask (int): The AND mask to be applied to the current value of the register.
|
||||||
|
orMask (int): The OR mask to be applied to the result of the AND operation.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The serialized message as a bytes object.
|
||||||
|
"""
|
||||||
lenth = super().serializeMaskWriteRegister(addr, andMask, orMask)
|
lenth = super().serializeMaskWriteRegister(addr, andMask, orMask)
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def serializeReportSlaveId(self) -> int:
|
def serializeReportSlaveId(self) -> int:
|
||||||
|
"""Serialize a report slave ID request.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: The length of the serialized message in bytes.
|
||||||
|
"""
|
||||||
lenth = super().serializeReportSlaveId()
|
lenth = super().serializeReportSlaveId()
|
||||||
return self.sendBuff[0:lenth]
|
return self.sendBuff[0:lenth]
|
||||||
|
|
||||||
def deserializeReadRegisters(self, msg: bytes) -> list:
|
def deserializeReadRegisters(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a read holding registers response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of integer values (0-65535) read from the registers.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
dest = super().deserializeReadRegisters(len(msg))
|
dest = super().deserializeReadRegisters(len(msg))
|
||||||
ret = []
|
ret = []
|
||||||
@ -59,18 +157,42 @@ class ModBus(_modbus._ModBus):
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
def deserializeReadBits(self, msg: bytes) -> list:
|
def deserializeReadBits(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a read coils response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of boolean values (True or False) read from the coils.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
length = len(msg)
|
length = len(msg)
|
||||||
dest = super().deserializeReadBits(length)
|
dest = super().deserializeReadBits(length)
|
||||||
return list(dest)
|
return list(dest)
|
||||||
|
|
||||||
def deserializeReadInputBits(self, msg: bytes) -> list:
|
def deserializeReadInputBits(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a read discrete inputs response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of boolean values (True or False) read from the discrete inputs.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
length = len(msg)
|
length = len(msg)
|
||||||
dest = super().deserializeReadInputBits(length)
|
dest = super().deserializeReadInputBits(length)
|
||||||
return list(dest)
|
return list(dest)
|
||||||
|
|
||||||
def deserializeReadInputRegisters(self, msg: bytes) -> list:
|
def deserializeReadInputRegisters(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a read input registers response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of integer values (0-65535) read from the input registers.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
length = len(msg)
|
length = len(msg)
|
||||||
dest = super().deserializeReadInputRegisters(length)
|
dest = super().deserializeReadInputRegisters(length)
|
||||||
@ -80,6 +202,14 @@ class ModBus(_modbus._ModBus):
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
def deserializeWriteAndReadRegisters(self, msg: bytes) -> list:
|
def deserializeWriteAndReadRegisters(self, msg: bytes) -> list:
|
||||||
|
"""Deserialize a write and read registers response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg (bytes): The received message as a bytes object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of integer values (0-65535) written to and read from the registers.
|
||||||
|
"""
|
||||||
self.readBuff = msg
|
self.readBuff = msg
|
||||||
length = len(msg)
|
length = len(msg)
|
||||||
dest = super().deserializeWriteAndReadRegisters(length)
|
dest = super().deserializeWriteAndReadRegisters(length)
|
||||||
@ -91,9 +221,21 @@ class ModBus(_modbus._ModBus):
|
|||||||
|
|
||||||
class ModBusRTU(ModBus):
|
class ModBusRTU(ModBus):
|
||||||
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
||||||
|
"""Initialize a Modbus RTU protocol instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sendBuffSize (int): The size of the send buffer in bytes.
|
||||||
|
readBuffSize (int): The size of the read buffer in bytes.
|
||||||
|
"""
|
||||||
self.__init__rtu(sendBuffSize, readBuffSize)
|
self.__init__rtu(sendBuffSize, readBuffSize)
|
||||||
|
|
||||||
|
|
||||||
class ModBusTCP(ModBus):
|
class ModBusTCP(ModBus):
|
||||||
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
||||||
|
"""Initialize a Modbus TCP protocol instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sendBuffSize (int): The size of the send buffer in bytes.
|
||||||
|
readBuffSize (int): The size of the read buffer in bytes.
|
||||||
|
"""
|
||||||
self.__init__tcp(sendBuffSize, readBuffSize)
|
self.__init__tcp(sendBuffSize, readBuffSize)
|
||||||
|
@ -2507,14 +2507,14 @@ static uint8_t Parser_checkIsMultiComment(char* line) {
|
|||||||
if (!((line[i + 1] == line[i]) && (line[i + 2] == line[i]))) {
|
if (!((line[i + 1] == line[i]) && (line[i + 2] == line[i]))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
/* check char befor the ''' or """ */
|
// /* check char befor the ''' or """ */
|
||||||
if (!((0 == i) || (line[i - 1] == ' '))) {
|
// if (!((0 == i) || (line[i - 1] == ' '))) {
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
/* check char after the ''' or """ */
|
// /* check char after the ''' or """ */
|
||||||
if (!((line[i + 3] == ' ') || (line[i + 3] == 0))) {
|
// if (!((line[i + 3] == ' ') || (line[i + 3] == 0))) {
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
/* mached */
|
/* mached */
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -2619,7 +2619,10 @@ static char* _Parser_linesToBytesOrAsm(Args* outBuffs,
|
|||||||
parse_after:
|
parse_after:
|
||||||
if (NULL == single_ASM) {
|
if (NULL == single_ASM) {
|
||||||
out_ASM = NULL;
|
out_ASM = NULL;
|
||||||
pika_platform_printf("------\r\n%s\r\n------\r\n", line);
|
pika_platform_printf(
|
||||||
|
"----------[%d]----------\r\n%s\r\n-------------------------"
|
||||||
|
"\r\n",
|
||||||
|
lines_index, line);
|
||||||
strsDeinit(&buffs);
|
strsDeinit(&buffs);
|
||||||
goto exit;
|
goto exit;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user