add demo for modbus_rt

This commit is contained in:
SenySunny 2024-03-15 17:19:53 +08:00
parent 2a7ac7244f
commit 2af9b13fdc
12 changed files with 211 additions and 0 deletions

View File

@ -0,0 +1,11 @@
import modbus_rt
import modbus_rt_defines as cst
serial_name = "COM11"
ascii_m = modbus_rt.rtu(cst.MASTER)
ascii_m.set_serial(serial_name)
ascii_m.open()
# ascii_m.excuse(1,3,0,5)
# ascii_m.excuse(1,16,0,5,[21,23,24,25,65])
# ascii_m.excuse(1,3,0,5)

View File

@ -0,0 +1,12 @@
import modbus_rt
import modbus_rt_defines as cst
serial_name = "COM11"
ascii_s = modbus_rt.ascii()
ascii_s.set_serial(serial_name)
ascii_s.add_block("A",cst.REGISTERS, 0, 10)
ascii_s.open()
# ascii_s.excuse(0,4,0,5)
# ascii_s.excuse(1,4,0,5,[1,2,34,5,6])
# ascii_s.excuse(0,4,0,5)

View File

@ -0,0 +1,41 @@
import modbus_rt
import modbus_rt_defines as cst
serial_name = "uart4"
ip_addr = ""
rm = modbus_rt.rtu(cst.MASTER)
rm.set_serial(serial_name)
rm.open()
ts = modbus_rt.tcp()
ts.set_net(ip_addr, 502, cst.SOCK_STREAM)
def pre_call(evt) :
slave = evt.slave
function = evt.function
addr = evt.addr
quantity = evt.quantity
if cst.READ_DISCRETE_INPUTS == function:
if addr >= 0 and addr <= 16 :
data = rm.excuse(slave, function, addr + 10000, quantity)
ts.excuse(cst.WRITE, cst.INPUTS, addr, quantity, data)
elif cst.READ_COILS == function:
if addr >= 0 and addr <= 16 :
data = rm.excuse(slave, function, addr + 20000, quantity)
ts.excuse(cst.WRITE, cst.CIOLS, addr, quantity, data)
def done_call(evt) :
slave = evt.slave
function = evt.function
addr = evt.addr
quantity = evt.quantity
if cst.WRITE_SINGLE_COIL == function:
if addr >= 0 and addr <= 16 :
data = ts.excuse(cst.READ, cst.CIOLS, addr, 1)
rm.excuse(slave, function, addr + 20000, data[0])
elif cst.WRITE_MULTIPLE_COILS == function:
if addr >= 0 and addr <= 16 :
data = ts.excuse(cst.READ, cst.CIOLS, addr, quantity)
rm.excuse(slave, function, addr + 20000, quantity, data)
ts.set_strict(0)
ts.set_pre_ans_callback(pre_call)
ts.set_done_callback(done_call)
ts.open()

View File

@ -0,0 +1,11 @@
import modbus_rt
import modbus_rt_defines as cst
serial_name = "COM11"
rm = modbus_rt.rtu(cst.MASTER)
rm.set_serial(serial_name)
rm.open()
# rm.excuse(1,3,0,5)
# rm.excuse(1,16,0,5,[21,23,24,25,65])
# rm.excuse(1,3,0,5)

View File

@ -0,0 +1,12 @@
import modbus_rt
import modbus_rt_defines as cst
serial_name = "uart1"
rs = modbus_rt.rtu()
rs.set_serial(serial_name)
rs.add_block("A",cst.REGISTERS, 0, 10)
rs.open()
# rs.excuse(0,4,0,5)
# rs.excuse(1,4,0,5,[1,2,34,5,6])
# rs.excuse(0,4,0,5)

View File

@ -0,0 +1,13 @@
import modbus_rt
import modbus_rt_defines as cst
ip_addr = "192.168.28.150"
rst = modbus_rt.rtu()
rst.set_over_type(cst.OVER_NET)
rst.set_net(ip_addr, 502, cst.SOCK_STREAM)
rst.add_block("A",cst.REGISTERS, 0, 10)
rst.open()
# rst.excuse(0,4,0,5)
# rst.excuse(1,4,0,5,[1,2,34,5,6])
# rst.excuse(0,4,0,5)

View File

@ -0,0 +1,13 @@
import modbus_rt
import modbus_rt_defines as cst
ip_addr = "192.168.28.150"
rsu = modbus_rt.rtu()
rsu.set_over_type(cst.OVER_NET)
rsu.set_net(ip_addr, 502, cst.SOCK_DGRAM)
rsu.add_block("A",cst.REGISTERS, 0, 10)
rsu.open()
# rsu.excuse(0,4,0,5)
# rsu.excuse(1,4,0,5,[1,2,34,5,6])
# rsu.excuse(0,4,0,5)

View File

@ -0,0 +1,46 @@
import modbus_rt
import modbus_rt_defines as cst
serial_name = "/dev/ttyUSB0"
ip_addr = ""
rm = modbus_rt.rtu(cst.MASTER)
rm.set_serial(serial_name)
rm.open()
ts = modbus_rt.tcp()
ts.set_net(ip_addr, 502, cst.SOCK_STREAM)
def pre_call(evt) :
slave = evt.slave
function = evt.function
addr = evt.addr
quantity = evt.quantity
if cst.READ_HOLDING_REGISTERS == function:
data = rm.excuse(slave, function, addr, quantity)
ts.excuse(cst.WRITE, cst.REGISTERS, addr, quantity, data)
elif cst.READ_DISCRETE_INPUTS == function:
data = rm.excuse(slave, function, addr, quantity)
ts.excuse(cst.WRITE, cst.INPUTS, addr, quantity, data)
def done_call(evt) :
slave = evt.slave
function = evt.function
addr = evt.addr
quantity = evt.quantity
if cst.WRITE_SINGLE_COIL == function:
data = ts.excuse(cst.READ, cst.CIOLS, addr, 1)
rm.excuse(slave, function, addr, data[0])
elif cst.WRITE_SINGLE_REGISTER == function:
data = ts.excuse(cst.READ, cst.REGISTERS, addr, 1)
rm.excuse(slave, function, addr, data[0])
elif cst.WRITE_MULTIPLE_COILS == function:
data = ts.excuse(cst.READ, cst.CIOLS, addr, quantity)
rm.excuse(slave, function, addr, quantity, data)
elif cst.WRITE_MULTIPLE_REGISTERS == function:
data = ts.excuse(0, cst.REGISTERS, addr, quantity)
rm.excuse(slave, function, addr, quantity, data)
ts.add_block("A", 0, 20000, 10)
ts.add_block("B", 1, 10000, 16)
ts.add_block("C", 4, 0, 10)
ts.set_strict(0)
ts.set_pre_ans_callback(pre_call)
ts.set_done_callback(done_call)
ts.open()

View File

@ -0,0 +1,12 @@
import modbus_rt
import modbus_rt_defines as cst
ip_addr = "192.168.28.150"
tm = modbus_rt.tcp(cst.MASTER)
tm.set_net(ip_addr, 0, cst.SOCK_STREAM)
tm.set_server(ip_addr, 502)
tm.open()
# tm.excuse(1,3,0,5)
# tm.excuse(1,16,0,5,[21,23,24,25,65])
# tm.excuse(1,3,0,5)

View File

@ -0,0 +1,13 @@
import modbus_rt
import modbus_rt_defines as cst
ip_addr = "192.168.28.150"
ts = modbus_rt.tcp()
ts.set_net(ip_addr, 502, cst.SOCK_STREAM)
ts.add_block("A",cst.REGISTERS, 0, 10)
ts.open()
# ts.excuse(0,4,0,5)
# ts.excuse(1,4,0,5,[1,2,34,5,6])
# ts.excuse(0,4,0,5)

View File

@ -0,0 +1,14 @@
import modbus_rt
import modbus_rt_defines as cst
ip_addr = "192.168.28.150"
tmu = modbus_rt.tcp(cst.MASTER)
tmu.set_net(ip_addr, 0, cst.SOCK_DGRAM)
tmu.open()
# tmu.set_server("255.255.255.255", 502)
# tmu.excuse(1,3,0,5)
# tmu.get_saddr()
# tmu.excuse(1,16,0,5,[21,23,24,25,65])
# tmu.excuse(1,3,0,5)

View File

@ -0,0 +1,13 @@
import modbus_rt
import modbus_rt_defines as cst
ip_addr = "192.168.28.150"
tsu = modbus_rt.tcp()
tsu.set_net(ip_addr, 502, cst.SOCK_DGRAM)
tsu.add_block("A",cst.REGISTERS, 0, 10)
tsu.open()
# tsu.excuse(0,4,0,5)
# tsu.excuse(1,4,0,5,[1,2,34,5,6])
# tsu.excuse(0,4,0,5)