use requests.pyi

requests.get() method ok

test requests.get passed
This commit is contained in:
lyon 2022-11-13 18:41:23 +08:00
parent adb8df45af
commit 3837c3aafd
13 changed files with 268 additions and 7 deletions

View File

@ -0,0 +1,97 @@
#include "requests.h"
#include "requests_Response.h"
#include "webclient.h"
#define GET_HEADER_BUFSZ 1024
#define GET_RESP_BUFSZ 1024
PikaObj* requests_request(PikaObj* self,
char* method,
char* url,
PikaDict* kwargs) {
PikaObj* response = newNormalObj(New_requests_Response);
obj_setStr(response, "url", url);
struct webclient_session* session = RT_NULL;
unsigned char* buffer = RT_NULL;
int ret = 0;
int bytes_read, resp_status;
int content_length = -1;
buffer = (unsigned char*)web_calloc(1, GET_RESP_BUFSZ);
if (buffer == RT_NULL) {
rt_kprintf("no memory for receive buffer.\n");
ret = -RT_ENOMEM;
goto __exit;
}
/* create webclient session and set header response size */
session = webclient_session_create(GET_HEADER_BUFSZ);
if (session == RT_NULL) {
ret = -RT_ENOMEM;
goto __exit;
}
if (strEqu("GET", method)) {
resp_status = webclient_get(session, url);
obj_setInt(response, "status_code", resp_status);
/* send GET request by default header */
if (resp_status != 200) {
goto __exit;
}
content_length = webclient_content_length_get(session);
obj_setInt(response, "content_length", content_length);
if (content_length < 0) {
// rt_kprintf("webclient GET request type is chunked.\n");
do {
bytes_read =
webclient_read(session, (void*)buffer, GET_RESP_BUFSZ);
if (bytes_read <= 0) {
break;
}
} while (1);
} else {
int content_pos = 0;
do {
bytes_read =
webclient_read(session, (void*)buffer,
content_length - content_pos > GET_RESP_BUFSZ
? GET_RESP_BUFSZ
: content_length - content_pos);
if (bytes_read <= 0) {
break;
}
content_pos += bytes_read;
} while (content_pos < content_length);
}
obj_setStr(response, "text", (char*)buffer);
goto __exit;
}
__exit:
if (session) {
webclient_close(session);
}
if (buffer) {
web_free(buffer);
}
if (ret != 0) {
if (response) {
obj_deinit(response);
}
response = NULL;
}
return response;
}
PikaObj* requests_get(PikaObj* self, char* url, PikaDict* kwargs) {
return requests_request(self, "GET", url, kwargs);
}
void requests___del__(PikaObj* self) {}
void requests___init__(PikaObj* self) {}

View File

@ -1 +0,0 @@
import _requests

View File

@ -0,0 +1,18 @@
class Response:
content_length: int
text: str
state_code: int
headers: dict
url: str
def json(self) -> dict: ...
def request(method: str, url: str, **kwargs) -> Response: ...
def get(url: str, **kwargs) -> Response: ...
def __init__(): ...
def __del__(): ...

View File

@ -0,0 +1,6 @@
#include "requests_Response.h"
PikaObj* requests_Response_json(PikaObj* self) {
/* TODO */
return NULL;
}

View File

@ -86,7 +86,8 @@
"errno.h": "c",
"ctype.h": "c",
"mqttclient.h": "c",
"_mqtt__mqtt.h": "c"
"_mqtt__mqtt.h": "c",
"requests_response.h": "c"
},
"python.formatting.provider": "autopep8",
"C_Cpp.errorSquiggles": "Disabled"

View File

@ -0,0 +1,97 @@
#include "requests.h"
#include "requests_Response.h"
#include "webclient.h"
#define GET_HEADER_BUFSZ 1024
#define GET_RESP_BUFSZ 1024
PikaObj* requests_request(PikaObj* self,
char* method,
char* url,
PikaDict* kwargs) {
PikaObj* response = newNormalObj(New_requests_Response);
obj_setStr(response, "url", url);
struct webclient_session* session = RT_NULL;
unsigned char* buffer = RT_NULL;
int ret = 0;
int bytes_read, resp_status;
int content_length = -1;
buffer = (unsigned char*)web_calloc(1, GET_RESP_BUFSZ);
if (buffer == RT_NULL) {
rt_kprintf("no memory for receive buffer.\n");
ret = -RT_ENOMEM;
goto __exit;
}
/* create webclient session and set header response size */
session = webclient_session_create(GET_HEADER_BUFSZ);
if (session == RT_NULL) {
ret = -RT_ENOMEM;
goto __exit;
}
if (strEqu("GET", method)) {
resp_status = webclient_get(session, url);
obj_setInt(response, "status_code", resp_status);
/* send GET request by default header */
if (resp_status != 200) {
goto __exit;
}
content_length = webclient_content_length_get(session);
obj_setInt(response, "content_length", content_length);
if (content_length < 0) {
// rt_kprintf("webclient GET request type is chunked.\n");
do {
bytes_read =
webclient_read(session, (void*)buffer, GET_RESP_BUFSZ);
if (bytes_read <= 0) {
break;
}
} while (1);
} else {
int content_pos = 0;
do {
bytes_read =
webclient_read(session, (void*)buffer,
content_length - content_pos > GET_RESP_BUFSZ
? GET_RESP_BUFSZ
: content_length - content_pos);
if (bytes_read <= 0) {
break;
}
content_pos += bytes_read;
} while (content_pos < content_length);
}
obj_setStr(response, "text", (char*)buffer);
goto __exit;
}
__exit:
if (session) {
webclient_close(session);
}
if (buffer) {
web_free(buffer);
}
if (ret != 0) {
if (response) {
obj_deinit(response);
}
response = NULL;
}
return response;
}
PikaObj* requests_get(PikaObj* self, char* url, PikaDict* kwargs) {
return requests_request(self, "GET", url, kwargs);
}
void requests___del__(PikaObj* self) {}
void requests___init__(PikaObj* self) {}

View File

@ -0,0 +1,6 @@
#include "requests_Response.h"
PikaObj* requests_Response_json(PikaObj* self) {
/* TODO */
return NULL;
}

View File

@ -1 +0,0 @@
import _requests

View File

@ -0,0 +1,18 @@
class Response:
content_length: int
text: str
state_code: int
headers: dict
url: str
def json(self) -> dict: ...
def request(method: str, url: str, **kwargs) -> Response: ...
def get(url: str, **kwargs) -> Response: ...
def __init__(): ...
def __del__(): ...

View File

@ -45,9 +45,9 @@
#define RT_EINTR 9 /**< Interrupted system call */
#define RT_EINVAL 10 /**< Invalid argument */
#define LOG_E(fmt, ...) __platform_printf(fmt, ##__VA_ARGS__)
#define LOG_W(...) LOG_E(__VA_ARGS__)
#define LOG_D(...) LOG_E(__VA_ARGS__)
#define LOG_E(fmt, ...) __platform_printf(fmt "\r\n", ##__VA_ARGS__)
#define LOG_W(...)
#define LOG_D(...)
#define RT_ASSERT(...) pika_assert(__VA_ARGS__)

View File

@ -187,10 +187,30 @@ int webclient_get_test(int argc, char** argv) {
}
}
TEST(network, get) {
TEST(requests, webclient_get) {
char* argv1[] = {"test", "http://www.rt-thread.com/service/rt-thread.txt"};
EXPECT_EQ(webclient_get_test(2, argv1), 0);
char* argv2[] = {"test", "-s",
"http://www.rt-thread.com/service/rt-thread.txt"};
EXPECT_EQ(webclient_get_test(3, argv2), 0);
}
TEST(requests, get) {
PikaObj* pikaMain = newRootObj("pikaMain", New_PikaMain);
extern unsigned char pikaModules_py_a[];
obj_linkLibrary(pikaMain, pikaModules_py_a);
obj_run(
pikaMain,
"import requests\n"
"r = requests.get('http://www.rt-thread.com/service/rt-thread.txt')\n");
/* assert */
EXPECT_STREQ(
obj_getStr(pikaMain, "r.text"),
"RT-Thread is an open source IoT operating system from China, which "
"has strong scalability: from a tiny kernel running on a tiny core, "
"for example ARM Cortex-M0, or Cortex-M3/4/7, to a rich feature system "
"running on MIPS32, ARM Cortex-A8, ARM Cortex-A9 DualCore etc.\r\n");
/* deinit */
obj_deinit(pikaMain);
EXPECT_EQ(pikaMemNow(), 0);
}