add test for socket_json issue

This commit is contained in:
lyon 2023-02-15 11:25:38 +08:00
parent c8942776e9
commit e1fb488b29
3 changed files with 174 additions and 0 deletions

View File

@ -0,0 +1,73 @@
import socket
import _thread
import random
import time
import json
test_finished = False
server_started = False
test_data = {
'result': {
'a_a': {
'value': 0.290000, 'desc': 'A 相电流'
}
}, 'code': 0
}
def socket_server_task(host, port):
"""
socket 服务器任务
:return:
"""
print("socket server start:", host, port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(5)
print("socket server waiting accept")
global server_started
server_started = True
accept, addr = s.accept()
print("socket server accepted at", addr)
while True:
try:
data = accept.recv(1024)
print('socket server recv:', data.decode())
# accept.send(data)
accept.send(json.dumps(test_data))
except Exception:
print('socket server closing accept')
accept.close()
break
print("socket server closing")
s.close()
global test_finished
test_finished = True
def socket_server_init(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_server_task, (host, port))
def socket_client_task(host, port):
print("socket client start:", host, port)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
for i in range(2):
client.send("hello".encode())
recv = client.recv(1024).decode()
print("client recv:", recv)
client.close()
def socket_server_test(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_client_task, (host, port))
test_port = random.randint(10000, 65535)
socket_server_init(port=test_port)
while not server_started:
time.sleep(0.1)
socket_server_test(port=test_port)
while not test_finished:
time.sleep(0.1)

View File

@ -260,6 +260,34 @@ TEST(socket, thread) {
obj_deinit(pikaMain);
EXPECT_EQ(pikaMemNow(), 0);
}
TEST(socket, json_issue) {
/* init */
pikaMemInfo.heapUsedMax = 0;
PikaObj* pikaMain = newRootObj("pikaMain", New_PikaMain);
extern unsigned char pikaModules_py_a[];
obj_linkLibrary(pikaMain, pikaModules_py_a);
/* run */
__platform_printf("BEGIN\r\n");
pikaVM_runSingleFile(pikaMain, "test/python/socket/socket_json.py");
/* collect */
/* assert */
EXPECT_STREQ(log_buff[2],
"client recv: "
"{\n\t\"code\":\t0,\n\t\"result\":\t{\n\t\t\"a_a\":\t{"
"\n\t\t\t\"desc\":\t\"A "
"\347\233\270\347\224\265\346\265\201\",\n\t\t\t\"value\":\t0."
"29\n\t\t}\n\t}\n}\r\n");
EXPECT_STREQ(log_buff[4],
"client recv: "
"{\n\t\"code\":\t0,\n\t\"result\":\t{\n\t\t\"a_a\":\t{"
"\n\t\t\t\"desc\":\t\"A "
"\347\233\270\347\224\265\346\265\201\",\n\t\t\t\"value\":\t0."
"29\n\t\t}\n\t}\n}\r\n");
/* deinit */
obj_deinit(pikaMain);
EXPECT_EQ(pikaMemNow(), 0);
}
#endif
#if !PIKA_NANO_ENABLE

View File

@ -0,0 +1,73 @@
import socket
import _thread
import random
import time
import json
test_finished = False
server_started = False
test_data = {
'result': {
'a_a': {
'value': 0.290000, 'desc': 'A 相电流'
}
}, 'code': 0
}
def socket_server_task(host, port):
"""
socket 服务器任务
:return:
"""
print("socket server start:", host, port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(5)
print("socket server waiting accept")
global server_started
server_started = True
accept, addr = s.accept()
print("socket server accepted at", addr)
while True:
try:
data = accept.recv(1024)
print('socket server recv:', data.decode())
# accept.send(data)
accept.send(json.dumps(test_data))
except Exception:
print('socket server closing accept')
accept.close()
break
print("socket server closing")
s.close()
global test_finished
test_finished = True
def socket_server_init(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_server_task, (host, port))
def socket_client_task(host, port):
print("socket client start:", host, port)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
for i in range(2):
client.send("hello".encode())
recv = client.recv(1024).decode()
print("client recv:", recv)
client.close()
def socket_server_test(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_client_task, (host, port))
test_port = random.randint(10000, 65535)
socket_server_init(port=test_port)
while not server_started:
time.sleep(0.1)
socket_server_test(port=test_port)
while not test_finished:
time.sleep(0.1)