add test for modbus parse error

This commit is contained in:
pikastech 2023-04-22 13:29:10 +08:00
parent 95903f6815
commit ce0999ec95
5 changed files with 63 additions and 0 deletions

View File

@ -0,0 +1,20 @@
# Import modbus module
import modbus
# Create a ModBusRTU object, specify the send buffer and receive buffer size as 128 bytes
mb = modbus.ModBusRTU(128, 128)
# Set slave address to 1
mb.setSlave(1)
# Generate a request frame for reading registers, specify the start address as 0 and the quantity as 10
send_buff = mb.serializeReadRegisters(0, 10)
# Print the byte string of the request frame
print(send_buff)
# Parse a response frame for reading registers, return a list containing the values of the registers
host_regists = mb.deserializeReadRegisters(
b'\x01\x03\x14\x00\x00\x04\xD2\x00\x00\x00\x00\x00\x7B\x00\x00\x00\x00\x00\x00\x00\x00\xE5\x0B'
)
print(host_regists)

View File

@ -151,6 +151,8 @@ class ModBus(_modbus._ModBus):
"""
self.readBuff = msg
dest = super().deserializeReadRegisters(len(msg))
if dest is None:
return None
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
@ -168,6 +170,8 @@ class ModBus(_modbus._ModBus):
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadBits(length)
if dest is None:
return None
return list(dest)
def deserializeReadInputBits(self, msg: bytes) -> list:
@ -182,6 +186,8 @@ class ModBus(_modbus._ModBus):
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputBits(length)
if dest is None:
return None
return list(dest)
def deserializeReadInputRegisters(self, msg: bytes) -> list:
@ -196,6 +202,8 @@ class ModBus(_modbus._ModBus):
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputRegisters(length)
if dest is None:
return None
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
@ -213,6 +221,8 @@ class ModBus(_modbus._ModBus):
self.readBuff = msg
length = len(msg)
dest = super().deserializeWriteAndReadRegisters(length)
if dest is None:
return None
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)

View File

@ -151,6 +151,8 @@ class ModBus(_modbus._ModBus):
"""
self.readBuff = msg
dest = super().deserializeReadRegisters(len(msg))
if dest is None:
return None
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
@ -168,6 +170,8 @@ class ModBus(_modbus._ModBus):
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadBits(length)
if dest is None:
return None
return list(dest)
def deserializeReadInputBits(self, msg: bytes) -> list:
@ -182,6 +186,8 @@ class ModBus(_modbus._ModBus):
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputBits(length)
if dest is None:
return None
return list(dest)
def deserializeReadInputRegisters(self, msg: bytes) -> list:
@ -196,6 +202,8 @@ class ModBus(_modbus._ModBus):
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputRegisters(length)
if dest is None:
return None
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
@ -213,6 +221,8 @@ class ModBus(_modbus._ModBus):
self.readBuff = msg
length = len(msg)
dest = super().deserializeWriteAndReadRegisters(length)
if dest is None:
return None
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)

View File

@ -591,6 +591,9 @@ TEST(module, REPL_stdtask) {
obj_deinit(pikaMain);
EXPECT_EQ(pikaMemNow(), 0);
}
TEST_SINGLE_FILE(modbus, rtu_master_err, "test/python/modbus/rtu_master_err.py")
#endif
TEST_END

View File

@ -0,0 +1,20 @@
# Import modbus module
import modbus
# Create a ModBusRTU object, specify the send buffer and receive buffer size as 128 bytes
mb = modbus.ModBusRTU(128, 128)
# Set slave address to 1
mb.setSlave(1)
# Generate a request frame for reading registers, specify the start address as 0 and the quantity as 10
send_buff = mb.serializeReadRegisters(0, 10)
# Print the byte string of the request frame
print(send_buff)
# Parse a response frame for reading registers, return a list containing the values of the registers
host_regists = mb.deserializeReadRegisters(
b'\x01\x03\x14\x00\x00\x04\xD2\x00\x00\x00\x00\x00\x7B\x00\x00\x00\x00\x00\x00\x00\x00\xE5\x0B'
)
print(host_regists)