modebus rtu_master test ok

This commit is contained in:
pikastech 2022-09-19 13:27:20 +08:00
parent 2f8d15ef9a
commit 600950381e
10 changed files with 327 additions and 162 deletions

View File

@ -0,0 +1,13 @@
import modbus
mb_tcp = modbus.ModBusRTU(128, 128)
mb_tcp.setSlave(1)
send_buff = mb_tcp.serializeReadRegisters(0, 10)
host_regists = mb_tcp.deserializeReadRegisters(
b'\x01\x03\x14\x00\x00\x00\x00\x04\xD2\x00\x00\x00\x00\x00\x7B\x00\x00\x00\x00\x00\x00\x00\x00\xE5\x0B'
)
print(send_buff)
print(host_regists)

View File

@ -5,14 +5,14 @@ void _modbus__ModBus___init__rtu(PikaObj* self,
int sendBUffSize,
int readBuffSize) {
agile_modbus_rtu_t ctx_rtu = {0};
agile_modbus_t* ctx = &ctx_rtu._ctx;
obj_setBytes(self, "sendBuff", NULL, sendBUffSize);
obj_setBytes(self, "readBuff", NULL, readBuffSize);
agile_modbus_rtu_init(&ctx_rtu, obj_getBytes(self, "sendBuff"),
sendBUffSize, obj_getBytes(self, "readBuff"),
readBuffSize);
obj_setStruct(self, "ctx_rtu", ctx_rtu);
obj_setPtr(self, "ctx", ctx);
agile_modbus_rtu_t* ctx_rtu_heap = obj_getStruct(self, "ctx_rtu");
obj_setPtr(self, "ctx", &ctx_rtu_heap->_ctx);
}
void _modbus__ModBus_setSlave(PikaObj* self, int slave) {
@ -24,14 +24,14 @@ void _modbus__ModBus___init__tcp(PikaObj* self,
int sendBuffSize,
int readBuffSize) {
agile_modbus_tcp_t ctx_tcp = {0};
agile_modbus_t* ctx = &ctx_tcp._ctx;
obj_setBytes(self, "sendBuff", NULL, sendBuffSize);
obj_setBytes(self, "readBuff", NULL, readBuffSize);
agile_modbus_tcp_init(&ctx_tcp, obj_getBytes(self, "sendBuff"),
sendBuffSize, obj_getBytes(self, "readBuff"),
readBuffSize);
obj_setStruct(self, "ctx_tcp", ctx_tcp);
obj_setPtr(self, "ctx", ctx);
agile_modbus_tcp_t* ctx_tcp_heap = obj_getStruct(self, "ctx_tcp");
obj_setPtr(self, "ctx", &ctx_tcp_heap->_ctx);
}
int _modbus__ModBus_deserializeMaskWriteRegister(PikaObj* self, int msgLength) {
@ -39,51 +39,54 @@ int _modbus__ModBus_deserializeMaskWriteRegister(PikaObj* self, int msgLength) {
return agile_modbus_deserialize_mask_write_register(ctx, msgLength);
}
int _modbus__ModBus_deserializeReadBits(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReadRegisters(PikaObj* self, int msgLength) {
uint16_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_read_bits(ctx, msgLength, dest);
int len = agile_modbus_deserialize_read_registers(ctx, msgLength,
(uint16_t*)buff);
return arg_newBytes((uint8_t*)buff, len * 2);
}
int _modbus__ModBus_deserializeReadInputBits(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReadBits(PikaObj* self, int msgLength) {
uint8_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_read_input_bits(ctx, msgLength, dest);
int len = agile_modbus_deserialize_read_bits(ctx, msgLength, buff);
return arg_newBytes(buff, len);
}
int _modbus__ModBus_deserializeReadInputRegisters(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReadInputBits(PikaObj* self, int msgLength) {
uint8_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_read_input_registers(ctx, msgLength,
(uint16_t*)dest);
int len = agile_modbus_deserialize_read_input_bits(ctx, msgLength, buff);
return arg_newBytes(buff, len);
}
int _modbus__ModBus_deserializeReadRegisters(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReadInputRegisters(PikaObj* self,
int msgLength) {
uint16_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_read_registers(ctx, msgLength,
(uint16_t*)dest);
int len = agile_modbus_deserialize_read_input_registers(ctx, msgLength,
(uint16_t*)buff);
return arg_newBytes((uint8_t*)buff, len * 2);
}
int _modbus__ModBus_deserializeReportSlaveId(PikaObj* self,
int msgLength,
int maxDest,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReportSlaveId(PikaObj* self,
int msgLength,
int maxDest) {
uint8_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_report_slave_id(ctx, msgLength, maxDest,
dest);
int len = agile_modbus_deserialize_report_slave_id(ctx, msgLength, maxDest,
(uint8_t*)buff);
return arg_newBytes(buff, len);
}
int _modbus__ModBus_deserializeWriteAndReadRegisters(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeWriteAndReadRegisters(PikaObj* self,
int msgLength) {
uint16_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_write_and_read_registers(ctx, msgLength,
(uint16_t*)dest);
int len = agile_modbus_deserialize_write_and_read_registers(
ctx, msgLength, (uint16_t*)buff);
return arg_newBytes((uint8_t*)buff, len * 2);
}
int _modbus__ModBus_deserializeWriteBit(PikaObj* self, int msgLength) {

View File

@ -1,30 +1,30 @@
class _ModBus:
def setSlave(self, slave: int): ...
def serializeReadBits(self, addr: int, nb: int) -> int: ...
def serializeReadInputBits(self, addr: int, nb: int) -> int: ...
def serializeReadRegisters(self, addr: int, nb: int) -> int: ...
def serializeReadInputRegisters(self, addr: int, nb: int) -> int: ...
def serializeWriteBit(self, addr: int, status: int) -> int: ...
def deserializeWriteBit(self, msgLength: int) -> int: ...
def serializeWriteRegister(self, addr: int, value: int) -> int: ...
def serializeMaskWriteRegister(self, addr: int, andMask: int, orMask: int) -> int: ...
def serializeReportSlaveId(self) -> int: ...
def serializeWriteAndReadRegisters(self, writeAddr: int, writeNb: int, src: bytes, readAddr: int, readNb: int) -> int: ...
def serializeWriteBits(self, addr: int, nb: int, src: bytes) -> int: ...
def serializeWriteRegisters(self, addr: int, nb: int, src: bytes) -> int: ...
def deserializeWriteBit(self, msgLength: int) -> int: ...
def deserializeWriteRegister(self, msgLength: int) -> int: ...
def deserializeWriteBits(self, msgLength: int) -> int: ...
def deserializeWriteRegisters(self, msgLength: int) -> int: ...
def serializeMaskWriteRegister(self, addr: int, andMask: int, orMask: int) -> int: ...
def deserializeMaskWriteRegister(self, msgLength: int) -> int: ...
def serializeWriteAndReadRegisters(self, writeAddr: int, writeNb: int, src: bytes, readAddr: int, readNb: int) -> int: ...
def serializeReportSlaveId(self) -> int: ...
def deserializeReadBits(self, msgLength: int, dest: bytes) -> int: ...
def deserializeReadInputBits(self, msgLength: int, dest: bytes) -> int: ...
def deserializeReadRegisters(self, msgLength: int, dest: bytes) -> int: ...
def deserializeReadInputRegisters(self, msgLength: int, dest: bytes) -> int: ...
def serializeWriteBits(self, addr: int, nb: int, src: bytes) -> int: ...
def serializeWriteRegisters(self, addr: int, nb: int, src: bytes) -> int: ...
def deserializeWriteAndReadRegisters(self, msgLength: int, dest: bytes) -> int: ...
def deserializeReportSlaveId(self, msgLength: int, maxDest: int, dest: bytes) -> int: ...
def deserializeReadBits(self, msgLength: int) -> bytes: ...
def deserializeReadInputBits(self, msgLength: int) -> bytes: ...
def deserializeReadRegisters(self, msgLength: int) -> bytes: ...
def deserializeReadInputRegisters(self, msgLength: int) -> bytes: ...
def deserializeWriteAndReadRegisters(self, msgLength: int) -> bytes: ...
def deserializeReportSlaveId(self, msgLength: int, maxDest: int) -> bytes: ...
def getSendBuff(self) -> bytes: ...
def getReadBuff(self) -> bytes: ...

View File

@ -2,48 +2,88 @@ import _modbus
class ModBus(_modbus._ModBus):
def deserializeReadBits(self, msgLength: int) -> list:
dest = bytes(msgLength)
super().deserializeReadBits(msgLength, dest)
return list(dest)
def deserializeReadInputBits(self, msgLength: int) -> list:
dest = bytes(msgLength)
super().deserializeReadInputBits(msgLength, dest)
return list(dest)
def serializeWriteBits(self, addr: int, nb: int, src: list) -> bytes:
lenth = super().serializeWriteBits(addr, nb, bytes(src))
return self.sendBuff[0:lenth]
def deserializeReadRegisters(self, msgLength: int) -> list:
dest = bytes(2 * msgLength)
super().deserializeReadRegisters(msgLength, dest)
ret = []
for i in range(0, len(dest), 2):
ret.append(dest[i] + dest[i + 1] * 256)
return ret
def deserializeReadInputRegisters(self, msgLength: int) -> list:
dest = bytes(2 * msgLength)
super().deserializeReadInputRegisters(msgLength, dest)
ret = []
for i in range(0, len(dest), 2):
ret.append(dest[i] + dest[i + 1] * 256)
return ret
def serializeWriteBits(self, addr: int, nb: int, src: list):
return super().serializeWriteBits(addr, nb, bytes(src))
def serializeWriteRegisters(self, addr: int, nb: int, src: list):
def serializeWriteRegisters(self, addr: int, nb: int, src: list) -> bytes:
_src = bytes(2 * len(src))
for i in range(len(src)):
_src[2 * i] = src[i] % 256
_src[2 * i + 1] = src[i] // 256
return super().serializeWriteRegisters(addr, nb, _src)
lenth = super().serializeWriteRegisters(addr, nb, _src)
return self.sendBuff[0:lenth]
def deserializeWriteAndReadRegisters(self, msgLength: int) -> list:
dest = bytes(2 * msgLength)
super().deserializeWriteAndReadRegisters(msgLength, dest)
def serializeReadBits(self, addr: int, nb: int) -> bytes:
lenth = super().serializeReadBits(addr, nb)
return self.sendBuff[0:lenth]
def serializeReadInputBits(self, addr: int, nb: int) -> bytes:
lenth = super().serializeReadInputBits(addr, nb)
return self.sendBuff[0:lenth]
def serializeReadRegisters(self, addr: int, nb: int) -> bytes:
lenth = super().serializeReadRegisters(addr, nb)
return self.sendBuff[0:lenth]
def serializeReadInputRegisters(self, addr: int, nb: int) -> bytes:
lenth = super().serializeReadInputRegisters(addr, nb)
return self.sendBuff[0:lenth]
def serializeWriteBit(self, addr: int, status: int) -> bytes:
lenth = super().serializeWriteBit(addr, status)
return self.sendBuff[0:lenth]
def serializeWriteRegister(self, addr: int, value: int) -> bytes:
lenth = super().serializeWriteRegister(addr, value)
return self.sendBuff[0:lenth]
def serializeMaskWriteRegister(self, addr: int, andMask: int, orMask: int) -> bytes:
lenth = super().serializeMaskWriteRegister(addr, andMask, orMask)
return self.sendBuff[0:lenth]
def serializeReportSlaveId(self) -> int:
lenth = super().serializeReportSlaveId()
return self.sendBuff[0:lenth]
def deserializeReadRegisters(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadRegisters(length)
ret = []
for i in range(0, len(dest), 2):
ret.append(dest[i] + dest[i + 1] * 256)
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
return ret
def deserializeReadBits(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadBits(length)
return list(dest)
def deserializeReadInputBits(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputBits(length)
return list(dest)
def deserializeReadInputRegisters(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputRegisters(length)
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
return ret
def deserializeWriteAndReadRegisters(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeWriteAndReadRegisters(length)
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
return ret

View File

@ -1,30 +1,30 @@
class _ModBus:
def setSlave(self, slave: int): ...
def serializeReadBits(self, addr: int, nb: int) -> int: ...
def serializeReadInputBits(self, addr: int, nb: int) -> int: ...
def serializeReadRegisters(self, addr: int, nb: int) -> int: ...
def serializeReadInputRegisters(self, addr: int, nb: int) -> int: ...
def serializeWriteBit(self, addr: int, status: int) -> int: ...
def deserializeWriteBit(self, msgLength: int) -> int: ...
def serializeWriteRegister(self, addr: int, value: int) -> int: ...
def serializeMaskWriteRegister(self, addr: int, andMask: int, orMask: int) -> int: ...
def serializeReportSlaveId(self) -> int: ...
def serializeWriteAndReadRegisters(self, writeAddr: int, writeNb: int, src: bytes, readAddr: int, readNb: int) -> int: ...
def serializeWriteBits(self, addr: int, nb: int, src: bytes) -> int: ...
def serializeWriteRegisters(self, addr: int, nb: int, src: bytes) -> int: ...
def deserializeWriteBit(self, msgLength: int) -> int: ...
def deserializeWriteRegister(self, msgLength: int) -> int: ...
def deserializeWriteBits(self, msgLength: int) -> int: ...
def deserializeWriteRegisters(self, msgLength: int) -> int: ...
def serializeMaskWriteRegister(self, addr: int, andMask: int, orMask: int) -> int: ...
def deserializeMaskWriteRegister(self, msgLength: int) -> int: ...
def serializeWriteAndReadRegisters(self, writeAddr: int, writeNb: int, src: bytes, readAddr: int, readNb: int) -> int: ...
def serializeReportSlaveId(self) -> int: ...
def deserializeReadBits(self, msgLength: int, dest: bytes) -> int: ...
def deserializeReadInputBits(self, msgLength: int, dest: bytes) -> int: ...
def deserializeReadRegisters(self, msgLength: int, dest: bytes) -> int: ...
def deserializeReadInputRegisters(self, msgLength: int, dest: bytes) -> int: ...
def serializeWriteBits(self, addr: int, nb: int, src: bytes) -> int: ...
def serializeWriteRegisters(self, addr: int, nb: int, src: bytes) -> int: ...
def deserializeWriteAndReadRegisters(self, msgLength: int, dest: bytes) -> int: ...
def deserializeReportSlaveId(self, msgLength: int, maxDest: int, dest: bytes) -> int: ...
def deserializeReadBits(self, msgLength: int) -> bytes: ...
def deserializeReadInputBits(self, msgLength: int) -> bytes: ...
def deserializeReadRegisters(self, msgLength: int) -> bytes: ...
def deserializeReadInputRegisters(self, msgLength: int) -> bytes: ...
def deserializeWriteAndReadRegisters(self, msgLength: int) -> bytes: ...
def deserializeReportSlaveId(self, msgLength: int, maxDest: int) -> bytes: ...
def getSendBuff(self) -> bytes: ...
def getReadBuff(self) -> bytes: ...

View File

@ -2,48 +2,88 @@ import _modbus
class ModBus(_modbus._ModBus):
def deserializeReadBits(self, msgLength: int) -> list:
dest = bytes(msgLength)
super().deserializeReadBits(msgLength, dest)
return list(dest)
def deserializeReadInputBits(self, msgLength: int) -> list:
dest = bytes(msgLength)
super().deserializeReadInputBits(msgLength, dest)
return list(dest)
def serializeWriteBits(self, addr: int, nb: int, src: list) -> bytes:
lenth = super().serializeWriteBits(addr, nb, bytes(src))
return self.sendBuff[0:lenth]
def deserializeReadRegisters(self, msgLength: int) -> list:
dest = bytes(2 * msgLength)
super().deserializeReadRegisters(msgLength, dest)
ret = []
for i in range(0, len(dest), 2):
ret.append(dest[i] + dest[i + 1] * 256)
return ret
def deserializeReadInputRegisters(self, msgLength: int) -> list:
dest = bytes(2 * msgLength)
super().deserializeReadInputRegisters(msgLength, dest)
ret = []
for i in range(0, len(dest), 2):
ret.append(dest[i] + dest[i + 1] * 256)
return ret
def serializeWriteBits(self, addr: int, nb: int, src: list):
return super().serializeWriteBits(addr, nb, bytes(src))
def serializeWriteRegisters(self, addr: int, nb: int, src: list):
def serializeWriteRegisters(self, addr: int, nb: int, src: list) -> bytes:
_src = bytes(2 * len(src))
for i in range(len(src)):
_src[2 * i] = src[i] % 256
_src[2 * i + 1] = src[i] // 256
return super().serializeWriteRegisters(addr, nb, _src)
lenth = super().serializeWriteRegisters(addr, nb, _src)
return self.sendBuff[0:lenth]
def deserializeWriteAndReadRegisters(self, msgLength: int) -> list:
dest = bytes(2 * msgLength)
super().deserializeWriteAndReadRegisters(msgLength, dest)
def serializeReadBits(self, addr: int, nb: int) -> bytes:
lenth = super().serializeReadBits(addr, nb)
return self.sendBuff[0:lenth]
def serializeReadInputBits(self, addr: int, nb: int) -> bytes:
lenth = super().serializeReadInputBits(addr, nb)
return self.sendBuff[0:lenth]
def serializeReadRegisters(self, addr: int, nb: int) -> bytes:
lenth = super().serializeReadRegisters(addr, nb)
return self.sendBuff[0:lenth]
def serializeReadInputRegisters(self, addr: int, nb: int) -> bytes:
lenth = super().serializeReadInputRegisters(addr, nb)
return self.sendBuff[0:lenth]
def serializeWriteBit(self, addr: int, status: int) -> bytes:
lenth = super().serializeWriteBit(addr, status)
return self.sendBuff[0:lenth]
def serializeWriteRegister(self, addr: int, value: int) -> bytes:
lenth = super().serializeWriteRegister(addr, value)
return self.sendBuff[0:lenth]
def serializeMaskWriteRegister(self, addr: int, andMask: int, orMask: int) -> bytes:
lenth = super().serializeMaskWriteRegister(addr, andMask, orMask)
return self.sendBuff[0:lenth]
def serializeReportSlaveId(self) -> int:
lenth = super().serializeReportSlaveId()
return self.sendBuff[0:lenth]
def deserializeReadRegisters(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadRegisters(length)
ret = []
for i in range(0, len(dest), 2):
ret.append(dest[i] + dest[i + 1] * 256)
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
return ret
def deserializeReadBits(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadBits(length)
return list(dest)
def deserializeReadInputBits(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputBits(length)
return list(dest)
def deserializeReadInputRegisters(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeReadInputRegisters(length)
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
return ret
def deserializeWriteAndReadRegisters(self, msg: bytes) -> list:
self.readBuff = msg
length = len(msg)
dest = super().deserializeWriteAndReadRegisters(length)
ret = []
for i in range(0, len(dest), 2):
ret.append(int(dest[i]) + int(dest[i + 1]) * 256)
return ret

View File

@ -5,14 +5,14 @@ void _modbus__ModBus___init__rtu(PikaObj* self,
int sendBUffSize,
int readBuffSize) {
agile_modbus_rtu_t ctx_rtu = {0};
agile_modbus_t* ctx = &ctx_rtu._ctx;
obj_setBytes(self, "sendBuff", NULL, sendBUffSize);
obj_setBytes(self, "readBuff", NULL, readBuffSize);
agile_modbus_rtu_init(&ctx_rtu, obj_getBytes(self, "sendBuff"),
sendBUffSize, obj_getBytes(self, "readBuff"),
readBuffSize);
obj_setStruct(self, "ctx_rtu", ctx_rtu);
obj_setPtr(self, "ctx", ctx);
agile_modbus_rtu_t* ctx_rtu_heap = obj_getStruct(self, "ctx_rtu");
obj_setPtr(self, "ctx", &ctx_rtu_heap->_ctx);
}
void _modbus__ModBus_setSlave(PikaObj* self, int slave) {
@ -24,14 +24,14 @@ void _modbus__ModBus___init__tcp(PikaObj* self,
int sendBuffSize,
int readBuffSize) {
agile_modbus_tcp_t ctx_tcp = {0};
agile_modbus_t* ctx = &ctx_tcp._ctx;
obj_setBytes(self, "sendBuff", NULL, sendBuffSize);
obj_setBytes(self, "readBuff", NULL, readBuffSize);
agile_modbus_tcp_init(&ctx_tcp, obj_getBytes(self, "sendBuff"),
sendBuffSize, obj_getBytes(self, "readBuff"),
readBuffSize);
obj_setStruct(self, "ctx_tcp", ctx_tcp);
obj_setPtr(self, "ctx", ctx);
agile_modbus_tcp_t* ctx_tcp_heap = obj_getStruct(self, "ctx_tcp");
obj_setPtr(self, "ctx", &ctx_tcp_heap->_ctx);
}
int _modbus__ModBus_deserializeMaskWriteRegister(PikaObj* self, int msgLength) {
@ -39,51 +39,54 @@ int _modbus__ModBus_deserializeMaskWriteRegister(PikaObj* self, int msgLength) {
return agile_modbus_deserialize_mask_write_register(ctx, msgLength);
}
int _modbus__ModBus_deserializeReadBits(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReadRegisters(PikaObj* self, int msgLength) {
uint16_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_read_bits(ctx, msgLength, dest);
int len = agile_modbus_deserialize_read_registers(ctx, msgLength,
(uint16_t*)buff);
return arg_newBytes((uint8_t*)buff, len * 2);
}
int _modbus__ModBus_deserializeReadInputBits(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReadBits(PikaObj* self, int msgLength) {
uint8_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_read_input_bits(ctx, msgLength, dest);
int len = agile_modbus_deserialize_read_bits(ctx, msgLength, buff);
return arg_newBytes(buff, len);
}
int _modbus__ModBus_deserializeReadInputRegisters(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReadInputBits(PikaObj* self, int msgLength) {
uint8_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_read_input_registers(ctx, msgLength,
(uint16_t*)dest);
int len = agile_modbus_deserialize_read_input_bits(ctx, msgLength, buff);
return arg_newBytes(buff, len);
}
int _modbus__ModBus_deserializeReadRegisters(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReadInputRegisters(PikaObj* self,
int msgLength) {
uint16_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_read_registers(ctx, msgLength,
(uint16_t*)dest);
int len = agile_modbus_deserialize_read_input_registers(ctx, msgLength,
(uint16_t*)buff);
return arg_newBytes((uint8_t*)buff, len * 2);
}
int _modbus__ModBus_deserializeReportSlaveId(PikaObj* self,
int msgLength,
int maxDest,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeReportSlaveId(PikaObj* self,
int msgLength,
int maxDest) {
uint8_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_report_slave_id(ctx, msgLength, maxDest,
dest);
int len = agile_modbus_deserialize_report_slave_id(ctx, msgLength, maxDest,
(uint8_t*)buff);
return arg_newBytes(buff, len);
}
int _modbus__ModBus_deserializeWriteAndReadRegisters(PikaObj* self,
int msgLength,
uint8_t* dest) {
Arg* _modbus__ModBus_deserializeWriteAndReadRegisters(PikaObj* self,
int msgLength) {
uint16_t buff[128] = {0};
agile_modbus_t* ctx = obj_getPtr(self, "ctx");
return agile_modbus_deserialize_write_and_read_registers(ctx, msgLength,
(uint16_t*)dest);
int len = agile_modbus_deserialize_write_and_read_registers(
ctx, msgLength, (uint16_t*)buff);
return arg_newBytes((uint8_t*)buff, len * 2);
}
int _modbus__ModBus_deserializeWriteBit(PikaObj* self, int msgLength) {

View File

@ -1502,6 +1502,36 @@ TEST(vm, super_val) {
obj_deinit(pikaMain);
EXPECT_EQ(pikaMemNow(), 0);
}
TEST(vm, super_val_) {
/* init */
pikaMemInfo.heapUsedMax = 0;
PikaObj* pikaMain = newRootObj("pikaMain", New_PikaMain);
extern unsigned char pikaModules_py_a[];
obj_linkLibrary(pikaMain, pikaModules_py_a);
/* run */
__platform_printf("BEGIN\r\n");
obj_run(pikaMain,
"class test:\n"
" def test(self, a):\n"
" self.a = a\n"
"class test2(test):\n"
" def test(self, a):\n"
" super().test(str(a))\n"
"t1 = test()\n"
"t2 = test2()\n"
"t1.test(1)\n"
"t2.test(1)\n");
/* collect */
int t1_a = obj_getInt(pikaMain, "t1.a");
char* t2_a = obj_getStr(pikaMain, "t2.a");
/* assert */
EXPECT_EQ(t1_a, 1);
/* deinit */
obj_deinit(pikaMain);
EXPECT_EQ(pikaMemNow(), 0);
}
#endif
#if !PIKA_NANO_ENABLE

View File

@ -308,3 +308,26 @@ TEST(re, findall) {
EXPECT_EQ(pikaMemNow(), 0);
}
#endif
#if !PIKA_NANO_ENABLE
TEST(modbus, rtu_master) {
/* init */
pikaMemInfo.heapUsedMax = 0;
PikaObj* pikaMain = newRootObj("pikaMain", New_PikaMain);
extern unsigned char pikaModules_py_a[];
obj_linkLibrary(pikaMain, pikaModules_py_a);
/* run */
__platform_printf("BEGIN\r\n");
pikaVM_runSingleFile(pikaMain, "test/python/modbus/rtu_master.py");
/* collect */
/* assert */
EXPECT_STREQ(log_buff[2], "BEGIN\r\n");
EXPECT_STREQ(log_buff[1],
"b'\\x01\\x03\\x00\\x00\\x00\\x0a\\xc5\\xcd'\r\n");
EXPECT_STREQ(log_buff[0], "[0, 0, 1234, 0, 0, 123, 0, 0, 0, 0]\r\n");
/* deinit */
obj_deinit(pikaMain);
EXPECT_EQ(pikaMemNow(), 0);
}
#endif

View File

@ -0,0 +1,13 @@
import modbus
mb_tcp = modbus.ModBusRTU(128, 128)
mb_tcp.setSlave(1)
send_buff = mb_tcp.serializeReadRegisters(0, 10)
host_regists = mb_tcp.deserializeReadRegisters(
b'\x01\x03\x14\x00\x00\x00\x00\x04\xD2\x00\x00\x00\x00\x00\x7B\x00\x00\x00\x00\x00\x00\x00\x00\xE5\x0B'
)
print(send_buff)
print(host_regists)