mirror of
https://gitee.com/Lyon1998/pikapython.git
synced 2025-01-15 17:02:53 +08:00
60 lines
1.9 KiB
Python
60 lines
1.9 KiB
Python
import _modbus
|
|
|
|
"""
|
|
class ModBus(_modbus._ModBus):
|
|
def deserializeReadBits(self, msgLength: int):
|
|
dest = bytes(msgLength)
|
|
super().deserializeReadBits(msgLength, dest)
|
|
return list(dest)
|
|
|
|
def deserializeReadInputBits(self, msgLength: int):
|
|
dest = bytes(msgLength)
|
|
super().deserializeReadInputBits(msgLength, dest)
|
|
return list(dest)
|
|
|
|
def deserializeReadRegisters(self, msgLength: int):
|
|
dest = bytes(2 * msgLength)
|
|
super().deserializeReadRegisters(msgLength, dest)
|
|
ret = []
|
|
for i in range(0, len(dest), 2):
|
|
ret.append(dest[i] + dest[i + 1] * 256)
|
|
return ret
|
|
|
|
def deserializeReadInputRegisters(self, msgLength: int) -> list:
|
|
dest = bytes(2 * msgLength)
|
|
super().deserializeReadInputRegisters(msgLength, dest)
|
|
ret = []
|
|
for i in range(0, len(dest), 2):
|
|
ret.append(dest[i] + dest[i + 1] * 256)
|
|
return ret
|
|
|
|
def serializeWriteBits(self, addr: int, nb: int, src: list):
|
|
src = bytes(src)
|
|
return super().serializeWriteBits(addr, nb, src)
|
|
|
|
def serializeWriteRegisters(self, addr: int, nb: int, src: list):
|
|
_src = bytes(2 * len(src))
|
|
for i in range(len(src)):
|
|
_src[2 * i] = src[i] % 256
|
|
_src[2 * i + 1] = src[i] // 256
|
|
return super().serializeWriteRegisters(addr, nb, _src)
|
|
|
|
def deserializeWriteAndReadRegisters(self, msgLength: int) -> list:
|
|
dest = bytes(2 * msgLength)
|
|
super().deserializeWriteAndReadRegisters(msgLength, dest)
|
|
ret = []
|
|
for i in range(0, len(dest), 2):
|
|
ret.append(dest[i] + dest[i + 1] * 256)
|
|
return ret
|
|
|
|
|
|
class ModBusRTU(ModBus):
|
|
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
|
self.__init__rtu(sendBuffSize, readBuffSize)
|
|
|
|
|
|
class ModBusTCP(ModBus):
|
|
def __init__(self, sendBuffSize: int, readBuffSize: int):
|
|
self.__init__tcp(sendBuffSize, readBuffSize)
|
|
"""
|