mirror of
https://gitee.com/Lyon1998/pikapython.git
synced 2025-02-05 17:28:23 +08:00
211 lines
8.7 KiB
Python
211 lines
8.7 KiB
Python
import _modbus_rt
|
|
import modbus_rt_defines as cst
|
|
|
|
LITTLE_ENDIAL_SWAP = 0
|
|
BIG_ENDIAL_SWAP = 1
|
|
LITTLE_ENDIAL = 2
|
|
BIG_ENDIAL = 3
|
|
|
|
SLAVE = 0
|
|
MASTER = 1
|
|
|
|
SOCK_STREAM = 1
|
|
SOCK_DGRAM = 2
|
|
|
|
class data_trans(_modbus_rt._data_trans):
|
|
def reg2reg(self, val: int):
|
|
return self._reg2reg(val)
|
|
def regs2regs(self, val: list):
|
|
return self._regs2regs(val)
|
|
def regs2bytes(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
if type(val) != list:
|
|
print("para only for type list")
|
|
return
|
|
return self._regs2bytes(val, mode)
|
|
def regs2str(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._regs2str(val, mode)
|
|
def regs2signed(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._regs2signed(val, mode)
|
|
def regs2int(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._regs2uint(val, mode)
|
|
def regs2uint(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._regs2int(val, mode)
|
|
def regs2long(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._regs2long(val, mode)
|
|
def regs2float(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._regs2float(val, mode)
|
|
def regs2double(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._regs2double(val, mode)
|
|
def bytes2regs(self, val: any, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._bytes2regs(val, mode)
|
|
def str2regs(self, val: str, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._str2regs(val, mode)
|
|
def signed2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._signed2regs(val, mode)
|
|
def int2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._int2regs(val, mode)
|
|
def uint2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._uint2regs(val, mode)
|
|
def long2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._long2regs(val, mode)
|
|
def float2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._float2regs(val, mode)
|
|
def double2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
|
|
return self._double2regs(val, mode)
|
|
|
|
class rtu(_modbus_rt._rtu):
|
|
mode = SLAVE
|
|
ascii_flag = 0
|
|
def __init__(self, mode = SLAVE):
|
|
return self._init(mode)
|
|
def set_serial(self, devname: str, baudrate = 115200, bytesize = 8, parity = 'N', stopbits = 1, xonxoff = 0):
|
|
return self._set_serial(devname, baudrate, bytesize, parity, stopbits, xonxoff)
|
|
def set_over_type(self, over_type: int):
|
|
return self._set_over_type(over_type)
|
|
def set_net(self, ip = '', port = 502, type = SOCK_STREAM):
|
|
return self._set_net(ip, port, type)
|
|
def set_ip(self, ip: str):
|
|
return self._set_ip(ip)
|
|
def set_port(self, port: int):
|
|
return self._set_port(port)
|
|
def set_type(self, type: int):
|
|
return self._set_type(type)
|
|
def set_p2p(self, p2p_flag: int):
|
|
return self._set_p2p(p2p_flag)
|
|
def open(self):
|
|
return self._open()
|
|
def isopen(self):
|
|
return self._isopen()
|
|
def close(self):
|
|
return self._close()
|
|
# 该函数仅对从机有效
|
|
def set_addr(self, addr: int):
|
|
return self._slave_set_addr(addr)
|
|
# 该函数仅对从机有效
|
|
def set_strict(self, strict: int):
|
|
return self._slave_set_strict(strict)
|
|
# 该函数仅对从机有效
|
|
def add_block(self, name: str, type: int, addr: int, nums: int):
|
|
return self._slave_add_block(name, type, addr, nums)
|
|
# 该函数仅对从机有效
|
|
def regs_binding(self, regs: bytes, type: int, addr: int, nums: int):
|
|
return self._slave_regs_binding(regs, type, addr, nums)
|
|
# 该函数仅对从机有效
|
|
def set_pre_ans_callback(self, cb):
|
|
return self._slave_set_pre_ans_callback(cb)
|
|
# 该函数仅对从机有效
|
|
def set_done_callback(self, cb):
|
|
return self._slave_set_done_callback(cb)
|
|
# 该函数仅对从机有效
|
|
def set_dev_binding(self, flag: int):
|
|
return self._slave_set_dev_binding(flag)
|
|
# 该函数仅对主机有效
|
|
def set_server(self, saddr: str,sport: int):
|
|
return self._master_set_server(saddr,sport)
|
|
# 该函数仅对主机有效
|
|
def get_saddr(self):
|
|
return self._master_get_saddr()
|
|
def excuse(self, dir_slave: int, type_function: int, addr: int, *val):
|
|
if self.mode == SLAVE:
|
|
if dir_slave == 0 :
|
|
return self._slave_read_regs(type_function, addr, *val)
|
|
elif dir_slave == 1 :
|
|
return self._slave_write_regs(type_function, addr, *val)
|
|
else :
|
|
return
|
|
elif self.mode == MASTER:
|
|
if type_function >= cst.READ_COILS and type_function <= cst.READ_INPUT_REGISTERS:
|
|
return self._master_read_list(dir_slave,type_function, addr, *val)
|
|
elif type_function >= cst.WRITE_SINGLE_COIL and type_function <= cst.WRITE_SINGLE_REGISTER:
|
|
return self._master_write_int(dir_slave,type_function, addr, *val)
|
|
elif type_function >= cst.WRITE_MULTIPLE_COILS and type_function <= cst.WRITE_MULTIPLE_REGISTERS:
|
|
return self._master_write_list(dir_slave,type_function, addr, *val)
|
|
else :
|
|
return
|
|
else :
|
|
return
|
|
# 该函数仅对主机有效
|
|
def download(self, slave: int, file_dev: str, file_master: str):
|
|
return self._master_download(slave, file_dev, file_master)
|
|
# 该函数仅对主机有效
|
|
def upload(self, slave: int, file_dev: str, file_master: str):
|
|
return self._master_upload(slave, file_dev, file_master)
|
|
|
|
class ascii(rtu):
|
|
mode = SLAVE
|
|
ascii_flag = 0
|
|
|
|
|
|
class tcp(_modbus_rt._tcp):
|
|
mode = SLAVE
|
|
def __init__(self, mode = SLAVE):
|
|
return self._init(mode)
|
|
def set_net(self, ip = '', port = 502, type = SOCK_STREAM):
|
|
return self._set_net(ip, port, type)
|
|
def set_ip(self, ip: str):
|
|
return self._set_ip(ip)
|
|
def set_port(self, port: int):
|
|
return self._set_port(port)
|
|
def set_type(self, type: int):
|
|
return self._set_type(type)
|
|
def set_p2p(self, p2p_flag: int):
|
|
return self._set_p2p(p2p_flag)
|
|
def open(self):
|
|
return self._open()
|
|
def isopen(self):
|
|
return self._isopen()
|
|
def close(self):
|
|
return self._close()
|
|
# 该函数仅对从机有效
|
|
def set_addr(self, addr: int):
|
|
return self._slave_set_addr(addr)
|
|
# 该函数仅对从机有效
|
|
def set_strict(self, strict: int):
|
|
return self._slave_set_strict(strict)
|
|
# 该函数仅对从机有效
|
|
def add_block(self, name: str, type: int, addr: int, nums: int):
|
|
return self._slave_add_block(name, type, addr, nums)
|
|
# 该函数仅对从机有效
|
|
def regs_binding(self, regs: bytes, type: int, addr: int, nums: int):
|
|
return self._slave_regs_binding(regs, type, addr, nums)
|
|
# 该函数仅对从机有效
|
|
def set_pre_ans_callback(self, cb):
|
|
return self._slave_set_pre_ans_callback(cb)
|
|
# 该函数仅对从机有效
|
|
def set_done_callback(self, cb):
|
|
return self._slave_set_done_callback(cb)
|
|
# 该函数仅对从机有效
|
|
def set_dev_binding(self, flag: int):
|
|
return self._slave_set_dev_binding(flag)
|
|
# 该函数仅对主机有效
|
|
def set_server(self, saddr: str,sport: int):
|
|
return self._master_set_server(saddr,sport)
|
|
# 该函数仅对主机有效
|
|
def get_saddr(self):
|
|
return self._master_get_saddr()
|
|
def excuse(self, dir_slave: int, type_function: int, addr: int, *val):
|
|
if self.mode == SLAVE:
|
|
if dir_slave == 0 :
|
|
return self._slave_read_regs(type_function, addr, *val)
|
|
elif dir_slave == 1 :
|
|
return self._slave_write_regs(type_function, addr, *val)
|
|
else :
|
|
return
|
|
elif self.mode == MASTER:
|
|
if type_function >= cst.READ_COILS and type_function <= cst.READ_INPUT_REGISTERS:
|
|
return self._master_read_list(dir_slave,type_function, addr, *val)
|
|
elif type_function >= cst.WRITE_SINGLE_COIL and type_function <= cst.WRITE_SINGLE_REGISTER:
|
|
return self._master_write_int(dir_slave,type_function, addr, *val)
|
|
elif type_function >= cst.WRITE_MULTIPLE_COILS and type_function <= cst.WRITE_MULTIPLE_REGISTERS:
|
|
return self._master_write_list(dir_slave,type_function, addr, *val)
|
|
else :
|
|
return
|
|
else :
|
|
return
|
|
# 该函数仅对主机有效
|
|
def download(self, slave: int, file_dev: str, file_master: str):
|
|
return self._master_download(slave, file_dev, file_master)
|
|
# 该函数仅对主机有效
|
|
def upload(self, slave: int, file_dev: str, file_master: str):
|
|
return self._master_upload(slave, file_dev, file_master)
|