adding threading module

This commit is contained in:
vbcssc 2024-10-12 16:53:18 +08:00
parent 2c3dd1223f
commit b3782c87b0
22 changed files with 2010 additions and 1 deletions

View File

@ -0,0 +1,83 @@
import _thread
import time
import threading
import PikaStdLib
# 共享资源
shared_resource = 0
# 互斥锁
mutex = threading.Lock()
# 线程函数
finished = 0
def thread_function(name, delay):
global shared_resource
global mutex, finished
print("delay : %s" % str(delay))
k = 0
i = 0
mem = PikaStdLib.MemChecker()
for i in range(5):
# while 1:
try:
# 获取互斥锁
print("%s try to acquire lock. #1" % name)
res = mutex.acquire(True, None)
print("res: %s" % str(res))
if 1: # 测试RLock或者Lock的超时加上
print("%s try to acquire lock. #2" % name)
res = mutex.acquire(True, 0.5)
print("res: %s" % str(res))
if res:
print("%s acquire lock SUCC." % name)
else:
print("%s acquire lock FAIL." % name)
# 打印当前线程名称和共享资源的值
print("Thread %s: Iteration %d, Shared Resource: %d" %
(name, i, shared_resource))
# 更新共享资源
shared_resource += 1
# 模拟工作时间
time.sleep(delay)
print("wake")
# 释放互斥锁
mutex.release()
mutex.release()
k += 1
print("%s i = %d." % (name, i))
# print('mem used now:')
# mem.now()
except:
print("------------- error ---------------")
print("%s exit , at last, i = %d." % (name, k))
finished += 1
# 主函数
def main():
# 创建第一个线程
_thread.start_new_thread(thread_function, ("Thread-1", 0.1))
time.sleep(0.5)
# 创建第二个线程
_thread.start_new_thread(thread_function, ("Thread-2", 0.2))
# 主线程等待子线程结束
# 由于 _thread 没有 join 方法,我们通过 sleep 来模拟等待
# time.sleep(60)
while finished < 2:
time.sleep(1)
main()

View File

@ -0,0 +1,458 @@
#include "PikaPlatformEx.h"
//----------------------------- mutex -------------------------------
// 带超时的互斥锁加锁
int pika_platform_thread_mutex_timedlock(pika_platform_thread_mutex_t* m,
pika_bool block,
Arg* timeout) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
ArgType timout_type = arg_getType(timeout);
pika_float timeout_f;
int result;
if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT ||
timout_type == ARG_TYPE_NONE)) {
return PIKA_RES_ERR_INVALID_PARAM;
}
if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) {
// printf("==== #01\n");
if (timout_type == ARG_TYPE_FLOAT) {
timeout_f = arg_getFloat(timeout);
}
if (timout_type == ARG_TYPE_INT) {
int timeout_d = arg_getInt(timeout);
timeout_f = (pika_float)timeout_d;
// printf("==== #04 %lf\n", timeout_f);
}
if (timeout_f < 0.0f) {
return PIKA_RES_ERR_INVALID_PARAM;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间
// 将浮点数秒转换为秒和纳秒
long sec = (long)timeout_f;
long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0);
ts.tv_sec += sec;
ts.tv_nsec += nsec;
// 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数
if (ts.tv_nsec >= 1000000000) {
ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数
ts.tv_sec += 1; // 增加 1 秒
}
pika_GIL_EXIT();
result = pthread_mutex_timedlock(&m->mutex, &ts);
pika_GIL_ENTER();
return result == 0 ? 0 : -1;
} else if (timout_type == ARG_TYPE_NONE) {
if (block) {
// printf("==== #02\n");
pika_GIL_EXIT();
result = pthread_mutex_lock(&m->mutex);
pika_GIL_ENTER();
return result == 0 ? 0 : -1;
} else {
// printf("==== #03\n");
pika_GIL_EXIT();
result = pthread_mutex_trylock(&m->mutex);
pika_GIL_ENTER();
return result == 0 ? 0 : -1;
}
} else {
return PIKA_RES_ERR_INVALID_PARAM;
}
#elif PIKA_FREERTOS_ENABLE
if (pdTRUE == xSemaphoreTake(m->mutex, (TickType_t)(timeout * 1000.0f))) {
return 0;
}
return -1;
#elif PIKA_RTTHREAD_ENABLE
return rt_mutex_take((m->mutex), (rt_tick_t)(timeout * RT_TICK_PER_SECOND));
#elif PIKA_ZEUSOS_ENABLE
return zos_mutex_lock(m->mutex, (uint32_t)(timeout * 1000.0f));
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
return -1;
#endif
}
//----------------------------- rtmutex -------------------------------
// 初始化递归互斥锁
void pika_platform_thread_rtmutex_init(pika_platform_thread_rtmutex_t* rtm) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_mutexattr_t attr;
if (pthread_mutexattr_init(&attr) != 0) {
perror("pthread_mutexattr_init");
exit(EXIT_FAILURE);
}
// 设置互斥锁类型为递归互斥锁
if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) != 0) {
perror("pthread_mutexattr_settype");
pthread_mutexattr_destroy(&attr);
exit(EXIT_FAILURE);
}
pthread_mutex_init(&rtm->mutex, &attr);
pthread_cond_init(&rtm->cond, NULL);
rtm->owner = (pthread_t)0;
rtm->count = 0;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 销毁递归互斥锁
void pika_platform_thread_rtmutex_destroy(pika_platform_thread_rtmutex_t* rtm) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_cond_destroy(&rtm->cond);
pthread_mutex_destroy(&rtm->mutex);
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 带超时的递归互斥锁加锁
int pika_platform_thread_rtmutex_lock(pika_platform_thread_rtmutex_t* rtm,
pika_bool block,
Arg* timeout) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
ArgType timout_type = arg_getType(timeout);
pika_float timeout_f;
int result;
if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT ||
timout_type == ARG_TYPE_NONE)) {
return PIKA_RES_ERR_INVALID_PARAM;
}
pika_GIL_EXIT();
pthread_mutex_lock(&rtm->mutex);
pika_GIL_ENTER();
if (rtm->owner == pthread_self()) {
// 如果当前线程已经持有锁则递归深度加1
rtm->count++;
// printf("rtm->count = %d\n", rtm->count);
pthread_mutex_unlock(&rtm->mutex);
// printf("succ\n");
return 0;
}
if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) {
// printf("==== #01\n");
if (timout_type == ARG_TYPE_FLOAT) {
timeout_f = arg_getFloat(timeout);
}
if (timout_type == ARG_TYPE_INT) {
int timeout_d = arg_getInt(timeout);
timeout_f = (pika_float)timeout_d;
// printf("==== #04 %lf\n", timeout_f);
}
if (timeout_f < 0.0f) {
return PIKA_RES_ERR_INVALID_PARAM;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间
// 将浮点数秒转换为秒和纳秒
long sec = (long)timeout_f;
long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0);
ts.tv_sec += sec;
ts.tv_nsec += nsec;
// 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数
if (ts.tv_nsec >= 1000000000) {
ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数
ts.tv_sec += 1; // 增加 1 秒
}
// 等待直到获得锁或超时
while (rtm->owner != (pthread_t)0) {
pika_GIL_EXIT();
result = pthread_cond_timedwait(&rtm->cond, &rtm->mutex, &ts);
pika_GIL_ENTER();
if (result != 0) {
pthread_mutex_unlock(&rtm->mutex);
return -1;
}
}
// 设置当前线程为锁的持有者
rtm->owner = pthread_self();
rtm->count = 1;
pthread_mutex_unlock(&rtm->mutex);
return 0;
} else if (timout_type == ARG_TYPE_NONE) {
if (block) {
// 永久等待
while (rtm->owner != (pthread_t)0) {
pika_GIL_EXIT();
result = pthread_cond_wait(&rtm->cond, &rtm->mutex);
pika_GIL_ENTER();
if (result != 0) {
pthread_mutex_unlock(&rtm->mutex);
return -1;
}
}
// 设置当前线程为锁的持有者
rtm->owner = pthread_self();
rtm->count = 1;
pthread_mutex_unlock(&rtm->mutex);
return 0;
} else {
// 非阻塞模式
if (rtm->owner == (pthread_t)0) {
// 如果没有其他线程持有锁,获取锁
rtm->owner = pthread_self();
rtm->count = 1;
pthread_mutex_unlock(&rtm->mutex);
return 0;
} else {
// 如果已经有其他线程持有锁,立即返回 -1
pthread_mutex_unlock(&rtm->mutex);
return -1;
}
}
} else {
return PIKA_RES_ERR_INVALID_PARAM;
}
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
#if 1
// 释放递归互斥锁
int pika_platform_thread_rtmutex_unlock(pika_platform_thread_rtmutex_t* rtm) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_t self = pthread_self();
pika_GIL_EXIT();
pthread_mutex_lock(&rtm->mutex);
pika_GIL_ENTER();
// printf("rtm->owner = %lu\n", rtm->owner);
if (rtm->owner != self) {
perror("Attempt to unlock a mutex not owned by the current thread");
pthread_mutex_unlock(&rtm->mutex);
return -1;
}
rtm->count--;
if (rtm->count == 0) {
rtm->owner = (pthread_t)0;
pthread_cond_signal(&rtm->cond);
// printf("rtm->owner = %lu\n", rtm->owner);
}
pthread_mutex_unlock(&rtm->mutex);
return 0;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
#endif
// 检查递归互斥锁是否已被当前线程获取
int pika_platform_thread_rtmutex_locked(pika_platform_thread_rtmutex_t* rtm) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_t self = pthread_self();
pika_GIL_EXIT();
pthread_mutex_lock(&rtm->mutex);
pika_GIL_ENTER();
int is_locked = (rtm->owner == self && rtm->count > 0);
pthread_mutex_unlock(&rtm->mutex);
return is_locked;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
//----------------------------- cond -------------------------------
void pika_platform_thread_cond_init(pika_platform_thread_cond_t* cond) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pika_platform_thread_rtmutex_init(&cond->rtmutex);
pthread_cond_init(&cond->cond, NULL);
cond->owner = (pthread_t)0;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
void pika_platform_thread_cond_destroy(pika_platform_thread_cond_t* cond) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_cond_destroy(&cond->cond);
pika_platform_thread_rtmutex_destroy(&cond->rtmutex);
cond->owner = (pthread_t)0; // 释放资源后重置 owner
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 检查当前线程是否持有互斥锁
static int is_mutex_owned(pika_platform_thread_cond_t* cond) {
// pthread_t current_thread = pthread_self();
// 使用 pthread_mutex_trylock 来检查是否已经持有锁
if (pthread_mutex_trylock(&cond->rtmutex.mutex) == EBUSY) {
// 如果锁已经被持有,尝试解锁并检查是否是当前线程持有的
if (pthread_mutex_unlock(&cond->rtmutex.mutex) == 0) {
return 1; // 当前线程持有锁
}
}
return 0;
}
// 带阻塞和超时功能的条件变量等待
int pika_platform_thread_cond_timedwait(pika_platform_thread_cond_t* cond,
Arg* timeout) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
ArgType timout_type = arg_getType(timeout);
pika_float timeout_f;
int result;
// 检查是否已经获得了互斥锁
if (!is_mutex_owned(cond)) {
return -1;
}
if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT ||
timout_type == ARG_TYPE_NONE)) {
return PIKA_RES_ERR_INVALID_PARAM;
}
if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) {
if (timout_type == ARG_TYPE_FLOAT) {
timeout_f = arg_getFloat(timeout);
} else {
int timeout_d = arg_getInt(timeout);
timeout_f = (pika_float)timeout_d;
}
if (timeout_f < 0.0f) {
return PIKA_RES_ERR_INVALID_PARAM;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间
// 将浮点数秒转换为秒和纳秒
long sec = (long)timeout_f;
long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0);
ts.tv_sec += sec;
ts.tv_nsec += nsec;
// 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数
if (ts.tv_nsec >= 1000000000) {
ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数
ts.tv_sec += 1; // 增加 1 秒
}
// 等待直到被通知或超时
pika_GIL_EXIT();
result = pthread_cond_timedwait(&cond->cond, &cond->rtmutex.mutex, &ts);
pika_GIL_ENTER();
if (result != 0) {
if (result == ETIMEDOUT) {
return -1; // 超时
}
perror("pthread_cond_timedwait");
return -1; // 其他错误
}
} else if (timout_type == ARG_TYPE_NONE) {
// 永久等待
pika_GIL_EXIT();
result = pthread_cond_wait(&cond->cond, &cond->rtmutex.mutex);
pika_GIL_ENTER();
if (result != 0) {
perror("pthread_cond_wait");
return -1;
}
} else {
return PIKA_RES_ERR_INVALID_PARAM;
}
return 0;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 信号量通知
int pika_platform_thread_cond_signal(pika_platform_thread_cond_t* cond) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
int result;
result = pthread_cond_signal(&cond->cond);
cond->owner = (pthread_t)0; // 通知后重置 owner
return result == 0 ? 0 : -1;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 信号量广播
int pika_platform_thread_cond_broadcast(pika_platform_thread_cond_t* cond) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
int result;
result = pthread_cond_broadcast(&cond->cond);
cond->owner = (pthread_t)0; // 广播后重置 owner
return result == 0 ? 0 : -1;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}

View File

@ -0,0 +1,53 @@
#ifndef ___PikaPlatformEx__H
#define ___PikaPlatformEx__H
#include "PikaPlatform.h"
#include "PikaObj.h"
#include <time.h>
#include <pthread.h>
#include <errno.h>
int pika_platform_thread_mutex_timedlock(pika_platform_thread_mutex_t* m,
pika_bool block,
Arg* timeout);
//-------------------------------
// 递归带超时互斥锁结构体
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t owner; // 当前持有锁的线程 ID
int count; // 递归深度
} pika_platform_thread_rtmutex_t;
// 初始化递归互斥锁
void pika_platform_thread_rtmutex_init(pika_platform_thread_rtmutex_t* rtm);
// 销毁递归互斥锁
void pika_platform_thread_rtmutex_destroy(pika_platform_thread_rtmutex_t* rtm);
// 带超时的递归互斥锁加锁
int pika_platform_thread_rtmutex_lock(pika_platform_thread_rtmutex_t* rtm,
pika_bool block,
Arg* timeout);
// 递归互斥锁解锁
int pika_platform_thread_rtmutex_unlock(pika_platform_thread_rtmutex_t* rtm);
// 是否已获得锁
int pika_platform_thread_rtmutex_locked(pika_platform_thread_rtmutex_t* rtm);
//-------------------------------------
typedef struct {
pika_platform_thread_rtmutex_t rtmutex;
pthread_cond_t cond;
pthread_t owner; // 当前持有锁的线程 ID
} pika_platform_thread_cond_t;
void pika_platform_thread_cond_init(pika_platform_thread_cond_t* cond);
void pika_platform_thread_cond_destroy(pika_platform_thread_cond_t* cond);
int pika_platform_thread_cond_signal(pika_platform_thread_cond_t* cond);
int pika_platform_thread_cond_broadcast(pika_platform_thread_cond_t* cond);
int pika_platform_thread_cond_timedwait(pika_platform_thread_cond_t* cond,
Arg* timeout);
#endif

View File

@ -0,0 +1,180 @@
#include "pika_hal_ex.h"
pika_hal_CircularPtrQueue* pika_hal_circularPtrQueue_create(size_t capacity) {
pika_hal_CircularPtrQueue* cb = (pika_hal_CircularPtrQueue*)pikaMalloc(
sizeof(pika_hal_CircularPtrQueue));
if (NULL == cb) {
return NULL;
}
cb->head = 0;
cb->tail = 0;
cb->count = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_init(&cb->mutex);
#endif
cb->capacity = capacity;
cb->buffer = (void**)pikaMalloc(capacity *
sizeof(void*)); // 分配足够的空间来存储指针
if (NULL == cb->buffer) {
pikaFree(cb, sizeof(pika_hal_CircularPtrQueue));
return NULL;
}
return cb;
}
int pika_hal_circularPtrQueue_enqueue(pika_hal_CircularPtrQueue* cb,
void* data) {
int ret = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
if (cb->count == cb->capacity) {
ret = -1;
goto __exit;
}
cb->buffer[cb->tail] = data;
cb->tail = (cb->tail + 1) % cb->capacity;
cb->count++;
__exit:
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return ret;
}
int pika_hal_circularPtrQueue_dequeue(pika_hal_CircularPtrQueue* cb,
void** value) {
int ret = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
if (cb->count == 0) {
ret = -1;
goto __exit;
}
*value = cb->buffer[cb->head];
cb->head = (cb->head + 1) % cb->capacity;
cb->count--;
__exit:
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return ret;
}
int pika_hal_circularPtrQueue_deinit(pika_hal_CircularPtrQueue* cb) {
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
pikaFree(cb->buffer, cb->capacity * sizeof(void*)); // 释放指针数组
cb->buffer = NULL;
cb->head = 0;
cb->tail = 0;
cb->count = 0;
cb->capacity = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_destroy(&cb->mutex);
#endif
pikaFree(cb, sizeof(pika_hal_CircularPtrQueue));
return 0;
}
size_t pika_hal_circularPtrQueue_getCount(pika_hal_CircularPtrQueue* cb) {
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
size_t count = cb->count;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return count;
}
int pika_hal_circularPtrQueue_isEmpty(pika_hal_CircularPtrQueue* cb) {
return cb->count == 0;
}
int pika_hal_circularPtrQueue_isFull(pika_hal_CircularPtrQueue* cb) {
return cb->count == cb->capacity;
}
int pika_hal_circularPtrQueue_peek(pika_hal_CircularPtrQueue* cb,
void** value) {
if (cb->count == 0) {
return -1;
}
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
*value = cb->buffer[cb->head];
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return 0;
}
// 以下待测试
int pika_hal_circularPtrQueue_enqueueHead(pika_hal_CircularPtrQueue* cb,
void* data) {
int ret = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
if (cb->count == cb->capacity) {
ret = -1; // 队列已满
goto __exit;
}
// 更新 head 指针前的位置,然后更新 head
cb->head = (cb->head - 1 + cb->capacity) % cb->capacity;
cb->buffer[cb->head] = data;
cb->count++;
__exit:
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return ret;
}
int pika_hal_circularPtrQueue_dequeueTail(pika_hal_CircularPtrQueue* cb,
void** value) {
int ret = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
if (cb->count == 0) {
ret = -1; // 队列为空
goto __exit;
}
*value = cb->buffer[cb->tail - 1];
cb->tail = (cb->tail - 1 + cb->capacity) % cb->capacity;
cb->count--;
__exit:
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return ret;
}
int pika_hal_circularPtrQueue_peekTail(pika_hal_CircularPtrQueue* cb,
void** value) {
if (cb->count == 0) {
return -1; // 队列为空
}
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
*value = cb->buffer[(cb->tail - 1 + cb->capacity) % cb->capacity];
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return 0;
}

View File

@ -0,0 +1,44 @@
#ifndef PIKA_HAL_CIRCULAR_PTR_QUEUE_H
#define PIKA_HAL_CIRCULAR_PTR_QUEUE_H
#include "pika_hal.h"
#include "PikaObj.h"
// 定义循环指针队列的结构体
typedef struct pika_hal_CircularPtrQueue {
void** buffer; // 存储元素的缓冲区
size_t head; // 队头指针
size_t tail; // 队尾指针
size_t count; // 当前元素数量
size_t capacity; // 缓冲区容量
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_t mutex; // 互斥锁
#endif
} pika_hal_CircularPtrQueue;
// 创建一个新的循环指针队列
pika_hal_CircularPtrQueue* pika_hal_circularPtrQueue_create(size_t capacity);
// 向队列中添加一个元素
int pika_hal_circularPtrQueue_enqueue(pika_hal_CircularPtrQueue* cb,
void* data);
// 从队列中移除一个元素
int pika_hal_circularPtrQueue_dequeue(pika_hal_CircularPtrQueue* cb,
void** value);
// 销毁队列并释放相关资源
int pika_hal_circularPtrQueue_deinit(pika_hal_CircularPtrQueue* cb);
// 获取队列中的元素数量
size_t pika_hal_circularPtrQueue_getCount(pika_hal_CircularPtrQueue* cb);
// 检查队列是否为空
int pika_hal_circularPtrQueue_isEmpty(pika_hal_CircularPtrQueue* cb);
// 检查队列是否已满
int pika_hal_circularPtrQueue_isFull(pika_hal_CircularPtrQueue* cb);
// 查看队列头部的元素,但不移除它
int pika_hal_circularPtrQueue_peek(pika_hal_CircularPtrQueue* cb, void** value);
#endif // PIKA_HAL_CIRCULAR_PTR_QUEUE_H

View File

@ -0,0 +1,5 @@
#include "threading.h"
int64_t threading_get_ident(PikaObj* self) {
return (int64_t)pika_platform_thread_self();
}

View File

@ -0,0 +1,52 @@
import _thread
from PikaObj import *
def get_ident() ->int64: ...
class Lock():
def __init__(self): # real signature unknown
self._mutex_ = None
def acquire(self, block:bool, timeout:any) -> bool: ... # real signature unknown; restored from __doc__
def locked(self) -> bool: ...# real signature unknown; restored from __doc__
def release(self): ... # real signature unknown; restored from __doc__
def __del__(self): ...
class RLock():
def __init__(self): # real signature unknown
self._rmutex_ = None
def acquire(self, block:bool, timeout:any) -> bool: ...# real signature unknown; restored from __doc__
def locked(self) -> bool: ... # real signature unknown; restored from __doc__
def release(self): ...# real signature unknown; restored from __doc__
def __del__(self): ...
class Condition:
def __init__(self): # real signature unknown
self._cond_ = None
self._rtmetux_ = None
def wait(self, timeout:any) -> bool: ...
def notify(self, n:int): ...
def notify_all(self): ...
def _is_owned(self) -> bool: ...
def __del__(self): ...
def acquire(self, block:bool, timeout:any) -> bool: ...
def release(self): ...

View File

@ -0,0 +1,45 @@
#include "threading_Condition.h"
#include "PikaPlatformEx.h"
void threading_Condition___del__(PikaObj* self) {
pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_");
pika_platform_thread_cond_destroy(cond);
pikaFree(cond, sizeof(pika_platform_thread_cond_t));
}
void threading_Condition___init__(PikaObj* self) {
pika_platform_thread_cond_t* cond =
pikaMalloc(sizeof(pika_platform_thread_cond_t));
pika_platform_thread_cond_init(cond);
obj_setPtr(self, "_cond_", cond);
}
pika_bool threading_Condition__is_owned(PikaObj* self) {
pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_");
if (cond->owner != 0) {
return pika_true;
} else {
return pika_false;
}
}
void threading_Condition_notify(PikaObj* self, int n) {
pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_");
pika_platform_thread_cond_signal(cond);
}
void threading_Condition_notify_all(PikaObj* self) {
pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_");
pika_platform_thread_cond_broadcast(cond);
}
pika_bool threading_Condition_wait(PikaObj* self, Arg* timeout) {
// pika_platform_thread_cond_t * cond = obj_getPtr(self, "_cond_");
return pika_false;
}
pika_bool threading_Condition_acquire(PikaObj* self,
pika_bool block,
Arg* timeout) {
return pika_false;
}
void threading_Condition_release(PikaObj* self) {}

View File

@ -0,0 +1,49 @@
#include "threading_Lock.h"
#include "PikaPlatformEx.h"
void threading_Lock___del__(PikaObj* self) {
pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_");
// pika_platform_thread_mutex_unlock(m);
pika_platform_thread_mutex_destroy(m);
pikaFree(m, sizeof(pika_platform_thread_mutex_t));
}
void threading_Lock___init__(PikaObj* self) {
pika_platform_thread_mutex_t* m =
pikaMalloc(sizeof(pika_platform_thread_mutex_t));
pika_platform_thread_mutex_init(m);
obj_setPtr(self, "_mutex_", m);
}
pika_bool threading_Lock_acquire(PikaObj* self, pika_bool block, Arg* timeout) {
pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_");
int result = pika_platform_thread_mutex_timedlock(m, block, timeout);
if (result == PIKA_RES_ERR_INVALID_PARAM) {
obj_setErrorCode(self, PIKA_RES_ERR_INVALID_PARAM);
obj_setSysOut(self, "invalid param!");
}
return result == 0 ? pika_true : pika_false;
}
pika_bool threading_Lock_locked(PikaObj* self) {
pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_");
pika_GIL_EXIT();
int result = pika_platform_thread_mutex_trylock(m);
pika_GIL_ENTER();
if (result == 0) {
// 成功获得了锁,需要解锁
// pika_GIL_EXIT();
pika_platform_thread_mutex_unlock(m);
// pika_GIL_ENTER();/* */
return pika_false; // 锁未被占用/* */
} else {
// 锁已被占用或发生了其他错误
// perror("pthread_mutex_trylock");
return pika_true;
}
}
void threading_Lock_release(PikaObj* self) {
pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_");
// pika_GIL_EXIT();
pika_platform_thread_mutex_unlock(m);
// pika_GIL_ENTER();
}

View File

@ -0,0 +1,34 @@
#include "threading_Lock.h"
#include "PikaPlatformEx.h"
void threading_RLock___del__(PikaObj* self) {
pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_");
pika_platform_thread_rtmutex_destroy(rm);
pikaFree(rm, sizeof(pika_platform_thread_rtmutex_t));
}
void threading_RLock___init__(PikaObj* self) {
pika_platform_thread_rtmutex_t* rm =
pikaMalloc(sizeof(pika_platform_thread_rtmutex_t));
pika_platform_thread_rtmutex_init(rm);
obj_setPtr(self, "_rmutex_", rm);
}
pika_bool threading_RLock_acquire(PikaObj* self,
pika_bool block,
Arg* timeout) {
pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_");
int result = pika_platform_thread_rtmutex_lock(rm, block, timeout);
return result == 0 ? pika_true : pika_false;
}
pika_bool threading_RLock_locked(PikaObj* self) {
pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_");
int result = pika_platform_thread_rtmutex_locked(rm);
return result != 0 ? pika_true : pika_false;
}
void threading_RLock_release(PikaObj* self) {
pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_");
pika_platform_thread_rtmutex_unlock(rm);
}

View File

@ -44,4 +44,5 @@ flashdb
fuzzypid
pika_lvgl
import_err
_from_import_pyi
_from_import_pyi
threading

View File

@ -0,0 +1,458 @@
#include "PikaPlatformEx.h"
//----------------------------- mutex -------------------------------
// 带超时的互斥锁加锁
int pika_platform_thread_mutex_timedlock(pika_platform_thread_mutex_t* m,
pika_bool block,
Arg* timeout) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
ArgType timout_type = arg_getType(timeout);
pika_float timeout_f;
int result;
if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT ||
timout_type == ARG_TYPE_NONE)) {
return PIKA_RES_ERR_INVALID_PARAM;
}
if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) {
// printf("==== #01\n");
if (timout_type == ARG_TYPE_FLOAT) {
timeout_f = arg_getFloat(timeout);
}
if (timout_type == ARG_TYPE_INT) {
int timeout_d = arg_getInt(timeout);
timeout_f = (pika_float)timeout_d;
// printf("==== #04 %lf\n", timeout_f);
}
if (timeout_f < 0.0f) {
return PIKA_RES_ERR_INVALID_PARAM;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间
// 将浮点数秒转换为秒和纳秒
long sec = (long)timeout_f;
long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0);
ts.tv_sec += sec;
ts.tv_nsec += nsec;
// 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数
if (ts.tv_nsec >= 1000000000) {
ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数
ts.tv_sec += 1; // 增加 1 秒
}
pika_GIL_EXIT();
result = pthread_mutex_timedlock(&m->mutex, &ts);
pika_GIL_ENTER();
return result == 0 ? 0 : -1;
} else if (timout_type == ARG_TYPE_NONE) {
if (block) {
// printf("==== #02\n");
pika_GIL_EXIT();
result = pthread_mutex_lock(&m->mutex);
pika_GIL_ENTER();
return result == 0 ? 0 : -1;
} else {
// printf("==== #03\n");
pika_GIL_EXIT();
result = pthread_mutex_trylock(&m->mutex);
pika_GIL_ENTER();
return result == 0 ? 0 : -1;
}
} else {
return PIKA_RES_ERR_INVALID_PARAM;
}
#elif PIKA_FREERTOS_ENABLE
if (pdTRUE == xSemaphoreTake(m->mutex, (TickType_t)(timeout * 1000.0f))) {
return 0;
}
return -1;
#elif PIKA_RTTHREAD_ENABLE
return rt_mutex_take((m->mutex), (rt_tick_t)(timeout * RT_TICK_PER_SECOND));
#elif PIKA_ZEUSOS_ENABLE
return zos_mutex_lock(m->mutex, (uint32_t)(timeout * 1000.0f));
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
return -1;
#endif
}
//----------------------------- rtmutex -------------------------------
// 初始化递归互斥锁
void pika_platform_thread_rtmutex_init(pika_platform_thread_rtmutex_t* rtm) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_mutexattr_t attr;
if (pthread_mutexattr_init(&attr) != 0) {
perror("pthread_mutexattr_init");
exit(EXIT_FAILURE);
}
// 设置互斥锁类型为递归互斥锁
if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) != 0) {
perror("pthread_mutexattr_settype");
pthread_mutexattr_destroy(&attr);
exit(EXIT_FAILURE);
}
pthread_mutex_init(&rtm->mutex, &attr);
pthread_cond_init(&rtm->cond, NULL);
rtm->owner = (pthread_t)0;
rtm->count = 0;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 销毁递归互斥锁
void pika_platform_thread_rtmutex_destroy(pika_platform_thread_rtmutex_t* rtm) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_cond_destroy(&rtm->cond);
pthread_mutex_destroy(&rtm->mutex);
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 带超时的递归互斥锁加锁
int pika_platform_thread_rtmutex_lock(pika_platform_thread_rtmutex_t* rtm,
pika_bool block,
Arg* timeout) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
ArgType timout_type = arg_getType(timeout);
pika_float timeout_f;
int result;
if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT ||
timout_type == ARG_TYPE_NONE)) {
return PIKA_RES_ERR_INVALID_PARAM;
}
pika_GIL_EXIT();
pthread_mutex_lock(&rtm->mutex);
pika_GIL_ENTER();
if (rtm->owner == pthread_self()) {
// 如果当前线程已经持有锁则递归深度加1
rtm->count++;
// printf("rtm->count = %d\n", rtm->count);
pthread_mutex_unlock(&rtm->mutex);
// printf("succ\n");
return 0;
}
if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) {
// printf("==== #01\n");
if (timout_type == ARG_TYPE_FLOAT) {
timeout_f = arg_getFloat(timeout);
}
if (timout_type == ARG_TYPE_INT) {
int timeout_d = arg_getInt(timeout);
timeout_f = (pika_float)timeout_d;
// printf("==== #04 %lf\n", timeout_f);
}
if (timeout_f < 0.0f) {
return PIKA_RES_ERR_INVALID_PARAM;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间
// 将浮点数秒转换为秒和纳秒
long sec = (long)timeout_f;
long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0);
ts.tv_sec += sec;
ts.tv_nsec += nsec;
// 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数
if (ts.tv_nsec >= 1000000000) {
ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数
ts.tv_sec += 1; // 增加 1 秒
}
// 等待直到获得锁或超时
while (rtm->owner != (pthread_t)0) {
pika_GIL_EXIT();
result = pthread_cond_timedwait(&rtm->cond, &rtm->mutex, &ts);
pika_GIL_ENTER();
if (result != 0) {
pthread_mutex_unlock(&rtm->mutex);
return -1;
}
}
// 设置当前线程为锁的持有者
rtm->owner = pthread_self();
rtm->count = 1;
pthread_mutex_unlock(&rtm->mutex);
return 0;
} else if (timout_type == ARG_TYPE_NONE) {
if (block) {
// 永久等待
while (rtm->owner != (pthread_t)0) {
pika_GIL_EXIT();
result = pthread_cond_wait(&rtm->cond, &rtm->mutex);
pika_GIL_ENTER();
if (result != 0) {
pthread_mutex_unlock(&rtm->mutex);
return -1;
}
}
// 设置当前线程为锁的持有者
rtm->owner = pthread_self();
rtm->count = 1;
pthread_mutex_unlock(&rtm->mutex);
return 0;
} else {
// 非阻塞模式
if (rtm->owner == (pthread_t)0) {
// 如果没有其他线程持有锁,获取锁
rtm->owner = pthread_self();
rtm->count = 1;
pthread_mutex_unlock(&rtm->mutex);
return 0;
} else {
// 如果已经有其他线程持有锁,立即返回 -1
pthread_mutex_unlock(&rtm->mutex);
return -1;
}
}
} else {
return PIKA_RES_ERR_INVALID_PARAM;
}
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
#if 1
// 释放递归互斥锁
int pika_platform_thread_rtmutex_unlock(pika_platform_thread_rtmutex_t* rtm) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_t self = pthread_self();
pika_GIL_EXIT();
pthread_mutex_lock(&rtm->mutex);
pika_GIL_ENTER();
// printf("rtm->owner = %lu\n", rtm->owner);
if (rtm->owner != self) {
perror("Attempt to unlock a mutex not owned by the current thread");
pthread_mutex_unlock(&rtm->mutex);
return -1;
}
rtm->count--;
if (rtm->count == 0) {
rtm->owner = (pthread_t)0;
pthread_cond_signal(&rtm->cond);
// printf("rtm->owner = %lu\n", rtm->owner);
}
pthread_mutex_unlock(&rtm->mutex);
return 0;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
#endif
// 检查递归互斥锁是否已被当前线程获取
int pika_platform_thread_rtmutex_locked(pika_platform_thread_rtmutex_t* rtm) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_t self = pthread_self();
pika_GIL_EXIT();
pthread_mutex_lock(&rtm->mutex);
pika_GIL_ENTER();
int is_locked = (rtm->owner == self && rtm->count > 0);
pthread_mutex_unlock(&rtm->mutex);
return is_locked;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
//----------------------------- cond -------------------------------
void pika_platform_thread_cond_init(pika_platform_thread_cond_t* cond) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pika_platform_thread_rtmutex_init(&cond->rtmutex);
pthread_cond_init(&cond->cond, NULL);
cond->owner = (pthread_t)0;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
void pika_platform_thread_cond_destroy(pika_platform_thread_cond_t* cond) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
pthread_cond_destroy(&cond->cond);
pika_platform_thread_rtmutex_destroy(&cond->rtmutex);
cond->owner = (pthread_t)0; // 释放资源后重置 owner
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 检查当前线程是否持有互斥锁
static int is_mutex_owned(pika_platform_thread_cond_t* cond) {
// pthread_t current_thread = pthread_self();
// 使用 pthread_mutex_trylock 来检查是否已经持有锁
if (pthread_mutex_trylock(&cond->rtmutex.mutex) == EBUSY) {
// 如果锁已经被持有,尝试解锁并检查是否是当前线程持有的
if (pthread_mutex_unlock(&cond->rtmutex.mutex) == 0) {
return 1; // 当前线程持有锁
}
}
return 0;
}
// 带阻塞和超时功能的条件变量等待
int pika_platform_thread_cond_timedwait(pika_platform_thread_cond_t* cond,
Arg* timeout) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
ArgType timout_type = arg_getType(timeout);
pika_float timeout_f;
int result;
// 检查是否已经获得了互斥锁
if (!is_mutex_owned(cond)) {
return -1;
}
if (!(timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT ||
timout_type == ARG_TYPE_NONE)) {
return PIKA_RES_ERR_INVALID_PARAM;
}
if (timout_type == ARG_TYPE_FLOAT || timout_type == ARG_TYPE_INT) {
if (timout_type == ARG_TYPE_FLOAT) {
timeout_f = arg_getFloat(timeout);
} else {
int timeout_d = arg_getInt(timeout);
timeout_f = (pika_float)timeout_d;
}
if (timeout_f < 0.0f) {
return PIKA_RES_ERR_INVALID_PARAM;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts); // 获取当前时间
// 将浮点数秒转换为秒和纳秒
long sec = (long)timeout_f;
long nsec = (long)((timeout_f - (pika_float)sec) * 1000000000.0);
ts.tv_sec += sec;
ts.tv_nsec += nsec;
// 如果纳秒数超过 1 秒,则需要调整秒数和纳秒数
if (ts.tv_nsec >= 1000000000) {
ts.tv_nsec -= 1000000000; // 减去 1 秒的纳秒数
ts.tv_sec += 1; // 增加 1 秒
}
// 等待直到被通知或超时
pika_GIL_EXIT();
result = pthread_cond_timedwait(&cond->cond, &cond->rtmutex.mutex, &ts);
pika_GIL_ENTER();
if (result != 0) {
if (result == ETIMEDOUT) {
return -1; // 超时
}
perror("pthread_cond_timedwait");
return -1; // 其他错误
}
} else if (timout_type == ARG_TYPE_NONE) {
// 永久等待
pika_GIL_EXIT();
result = pthread_cond_wait(&cond->cond, &cond->rtmutex.mutex);
pika_GIL_ENTER();
if (result != 0) {
perror("pthread_cond_wait");
return -1;
}
} else {
return PIKA_RES_ERR_INVALID_PARAM;
}
return 0;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 信号量通知
int pika_platform_thread_cond_signal(pika_platform_thread_cond_t* cond) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
int result;
result = pthread_cond_signal(&cond->cond);
cond->owner = (pthread_t)0; // 通知后重置 owner
return result == 0 ? 0 : -1;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}
// 信号量广播
int pika_platform_thread_cond_broadcast(pika_platform_thread_cond_t* cond) {
#if defined(__linux) || (PIKA_WIN_PTHREAD_ENABLE)
int result;
result = pthread_cond_broadcast(&cond->cond);
cond->owner = (pthread_t)0; // 广播后重置 owner
return result == 0 ? 0 : -1;
#elif PIKA_FREERTOS_ENABLE
#elif PIKA_RTTHREAD_ENABLE
#elif PIKA_ZEUSOS_ENABLE
#else
WEAK_FUNCTION_NEED_OVERRIDE_ERROR(_);
#endif
}

View File

@ -0,0 +1,53 @@
#ifndef ___PikaPlatformEx__H
#define ___PikaPlatformEx__H
#include "PikaPlatform.h"
#include "PikaObj.h"
#include <time.h>
#include <pthread.h>
#include <errno.h>
int pika_platform_thread_mutex_timedlock(pika_platform_thread_mutex_t* m,
pika_bool block,
Arg* timeout);
//-------------------------------
// 递归带超时互斥锁结构体
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t owner; // 当前持有锁的线程 ID
int count; // 递归深度
} pika_platform_thread_rtmutex_t;
// 初始化递归互斥锁
void pika_platform_thread_rtmutex_init(pika_platform_thread_rtmutex_t* rtm);
// 销毁递归互斥锁
void pika_platform_thread_rtmutex_destroy(pika_platform_thread_rtmutex_t* rtm);
// 带超时的递归互斥锁加锁
int pika_platform_thread_rtmutex_lock(pika_platform_thread_rtmutex_t* rtm,
pika_bool block,
Arg* timeout);
// 递归互斥锁解锁
int pika_platform_thread_rtmutex_unlock(pika_platform_thread_rtmutex_t* rtm);
// 是否已获得锁
int pika_platform_thread_rtmutex_locked(pika_platform_thread_rtmutex_t* rtm);
//-------------------------------------
typedef struct {
pika_platform_thread_rtmutex_t rtmutex;
pthread_cond_t cond;
pthread_t owner; // 当前持有锁的线程 ID
} pika_platform_thread_cond_t;
void pika_platform_thread_cond_init(pika_platform_thread_cond_t* cond);
void pika_platform_thread_cond_destroy(pika_platform_thread_cond_t* cond);
int pika_platform_thread_cond_signal(pika_platform_thread_cond_t* cond);
int pika_platform_thread_cond_broadcast(pika_platform_thread_cond_t* cond);
int pika_platform_thread_cond_timedwait(pika_platform_thread_cond_t* cond,
Arg* timeout);
#endif

View File

@ -0,0 +1,180 @@
#include "pika_hal_ex.h"
pika_hal_CircularPtrQueue* pika_hal_circularPtrQueue_create(size_t capacity) {
pika_hal_CircularPtrQueue* cb = (pika_hal_CircularPtrQueue*)pikaMalloc(
sizeof(pika_hal_CircularPtrQueue));
if (NULL == cb) {
return NULL;
}
cb->head = 0;
cb->tail = 0;
cb->count = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_init(&cb->mutex);
#endif
cb->capacity = capacity;
cb->buffer = (void**)pikaMalloc(capacity *
sizeof(void*)); // 分配足够的空间来存储指针
if (NULL == cb->buffer) {
pikaFree(cb, sizeof(pika_hal_CircularPtrQueue));
return NULL;
}
return cb;
}
int pika_hal_circularPtrQueue_enqueue(pika_hal_CircularPtrQueue* cb,
void* data) {
int ret = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
if (cb->count == cb->capacity) {
ret = -1;
goto __exit;
}
cb->buffer[cb->tail] = data;
cb->tail = (cb->tail + 1) % cb->capacity;
cb->count++;
__exit:
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return ret;
}
int pika_hal_circularPtrQueue_dequeue(pika_hal_CircularPtrQueue* cb,
void** value) {
int ret = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
if (cb->count == 0) {
ret = -1;
goto __exit;
}
*value = cb->buffer[cb->head];
cb->head = (cb->head + 1) % cb->capacity;
cb->count--;
__exit:
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return ret;
}
int pika_hal_circularPtrQueue_deinit(pika_hal_CircularPtrQueue* cb) {
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
pikaFree(cb->buffer, cb->capacity * sizeof(void*)); // 释放指针数组
cb->buffer = NULL;
cb->head = 0;
cb->tail = 0;
cb->count = 0;
cb->capacity = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_destroy(&cb->mutex);
#endif
pikaFree(cb, sizeof(pika_hal_CircularPtrQueue));
return 0;
}
size_t pika_hal_circularPtrQueue_getCount(pika_hal_CircularPtrQueue* cb) {
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
size_t count = cb->count;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return count;
}
int pika_hal_circularPtrQueue_isEmpty(pika_hal_CircularPtrQueue* cb) {
return cb->count == 0;
}
int pika_hal_circularPtrQueue_isFull(pika_hal_CircularPtrQueue* cb) {
return cb->count == cb->capacity;
}
int pika_hal_circularPtrQueue_peek(pika_hal_CircularPtrQueue* cb,
void** value) {
if (cb->count == 0) {
return -1;
}
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
*value = cb->buffer[cb->head];
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return 0;
}
// 以下待测试
int pika_hal_circularPtrQueue_enqueueHead(pika_hal_CircularPtrQueue* cb,
void* data) {
int ret = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
if (cb->count == cb->capacity) {
ret = -1; // 队列已满
goto __exit;
}
// 更新 head 指针前的位置,然后更新 head
cb->head = (cb->head - 1 + cb->capacity) % cb->capacity;
cb->buffer[cb->head] = data;
cb->count++;
__exit:
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return ret;
}
int pika_hal_circularPtrQueue_dequeueTail(pika_hal_CircularPtrQueue* cb,
void** value) {
int ret = 0;
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
if (cb->count == 0) {
ret = -1; // 队列为空
goto __exit;
}
*value = cb->buffer[cb->tail - 1];
cb->tail = (cb->tail - 1 + cb->capacity) % cb->capacity;
cb->count--;
__exit:
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return ret;
}
int pika_hal_circularPtrQueue_peekTail(pika_hal_CircularPtrQueue* cb,
void** value) {
if (cb->count == 0) {
return -1; // 队列为空
}
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_lock(&cb->mutex);
#endif
*value = cb->buffer[(cb->tail - 1 + cb->capacity) % cb->capacity];
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_unlock(&cb->mutex);
#endif
return 0;
}

View File

@ -0,0 +1,44 @@
#ifndef PIKA_HAL_CIRCULAR_PTR_QUEUE_H
#define PIKA_HAL_CIRCULAR_PTR_QUEUE_H
#include "pika_hal.h"
#include "PikaObj.h"
// 定义循环指针队列的结构体
typedef struct pika_hal_CircularPtrQueue {
void** buffer; // 存储元素的缓冲区
size_t head; // 队头指针
size_t tail; // 队尾指针
size_t count; // 当前元素数量
size_t capacity; // 缓冲区容量
#if PIKA_HAL_CIRCULAR_QUEUE_MUTEX_ENABLE
pika_platform_thread_mutex_t mutex; // 互斥锁
#endif
} pika_hal_CircularPtrQueue;
// 创建一个新的循环指针队列
pika_hal_CircularPtrQueue* pika_hal_circularPtrQueue_create(size_t capacity);
// 向队列中添加一个元素
int pika_hal_circularPtrQueue_enqueue(pika_hal_CircularPtrQueue* cb,
void* data);
// 从队列中移除一个元素
int pika_hal_circularPtrQueue_dequeue(pika_hal_CircularPtrQueue* cb,
void** value);
// 销毁队列并释放相关资源
int pika_hal_circularPtrQueue_deinit(pika_hal_CircularPtrQueue* cb);
// 获取队列中的元素数量
size_t pika_hal_circularPtrQueue_getCount(pika_hal_CircularPtrQueue* cb);
// 检查队列是否为空
int pika_hal_circularPtrQueue_isEmpty(pika_hal_CircularPtrQueue* cb);
// 检查队列是否已满
int pika_hal_circularPtrQueue_isFull(pika_hal_CircularPtrQueue* cb);
// 查看队列头部的元素,但不移除它
int pika_hal_circularPtrQueue_peek(pika_hal_CircularPtrQueue* cb, void** value);
#endif // PIKA_HAL_CIRCULAR_PTR_QUEUE_H

View File

@ -0,0 +1,5 @@
#include "threading.h"
int64_t threading_get_ident(PikaObj* self) {
return (int64_t)pika_platform_thread_self();
}

View File

@ -0,0 +1,45 @@
#include "threading_Condition.h"
#include "PikaPlatformEx.h"
void threading_Condition___del__(PikaObj* self) {
pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_");
pika_platform_thread_cond_destroy(cond);
pikaFree(cond, sizeof(pika_platform_thread_cond_t));
}
void threading_Condition___init__(PikaObj* self) {
pika_platform_thread_cond_t* cond =
pikaMalloc(sizeof(pika_platform_thread_cond_t));
pika_platform_thread_cond_init(cond);
obj_setPtr(self, "_cond_", cond);
}
pika_bool threading_Condition__is_owned(PikaObj* self) {
pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_");
if (cond->owner != 0) {
return pika_true;
} else {
return pika_false;
}
}
void threading_Condition_notify(PikaObj* self, int n) {
pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_");
pika_platform_thread_cond_signal(cond);
}
void threading_Condition_notify_all(PikaObj* self) {
pika_platform_thread_cond_t* cond = obj_getPtr(self, "_cond_");
pika_platform_thread_cond_broadcast(cond);
}
pika_bool threading_Condition_wait(PikaObj* self, Arg* timeout) {
// pika_platform_thread_cond_t * cond = obj_getPtr(self, "_cond_");
return pika_false;
}
pika_bool threading_Condition_acquire(PikaObj* self,
pika_bool block,
Arg* timeout) {
return pika_false;
}
void threading_Condition_release(PikaObj* self) {}

View File

@ -0,0 +1,49 @@
#include "threading_Lock.h"
#include "PikaPlatformEx.h"
void threading_Lock___del__(PikaObj* self) {
pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_");
// pika_platform_thread_mutex_unlock(m);
pika_platform_thread_mutex_destroy(m);
pikaFree(m, sizeof(pika_platform_thread_mutex_t));
}
void threading_Lock___init__(PikaObj* self) {
pika_platform_thread_mutex_t* m =
pikaMalloc(sizeof(pika_platform_thread_mutex_t));
pika_platform_thread_mutex_init(m);
obj_setPtr(self, "_mutex_", m);
}
pika_bool threading_Lock_acquire(PikaObj* self, pika_bool block, Arg* timeout) {
pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_");
int result = pika_platform_thread_mutex_timedlock(m, block, timeout);
if (result == PIKA_RES_ERR_INVALID_PARAM) {
obj_setErrorCode(self, PIKA_RES_ERR_INVALID_PARAM);
obj_setSysOut(self, "invalid param!");
}
return result == 0 ? pika_true : pika_false;
}
pika_bool threading_Lock_locked(PikaObj* self) {
pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_");
pika_GIL_EXIT();
int result = pika_platform_thread_mutex_trylock(m);
pika_GIL_ENTER();
if (result == 0) {
// 成功获得了锁,需要解锁
// pika_GIL_EXIT();
pika_platform_thread_mutex_unlock(m);
// pika_GIL_ENTER();/* */
return pika_false; // 锁未被占用/* */
} else {
// 锁已被占用或发生了其他错误
// perror("pthread_mutex_trylock");
return pika_true;
}
}
void threading_Lock_release(PikaObj* self) {
pika_platform_thread_mutex_t* m = obj_getPtr(self, "_mutex_");
// pika_GIL_EXIT();
pika_platform_thread_mutex_unlock(m);
// pika_GIL_ENTER();
}

View File

@ -0,0 +1,34 @@
#include "threading_Lock.h"
#include "PikaPlatformEx.h"
void threading_RLock___del__(PikaObj* self) {
pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_");
pika_platform_thread_rtmutex_destroy(rm);
pikaFree(rm, sizeof(pika_platform_thread_rtmutex_t));
}
void threading_RLock___init__(PikaObj* self) {
pika_platform_thread_rtmutex_t* rm =
pikaMalloc(sizeof(pika_platform_thread_rtmutex_t));
pika_platform_thread_rtmutex_init(rm);
obj_setPtr(self, "_rmutex_", rm);
}
pika_bool threading_RLock_acquire(PikaObj* self,
pika_bool block,
Arg* timeout) {
pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_");
int result = pika_platform_thread_rtmutex_lock(rm, block, timeout);
return result == 0 ? pika_true : pika_false;
}
pika_bool threading_RLock_locked(PikaObj* self) {
pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_");
int result = pika_platform_thread_rtmutex_locked(rm);
return result != 0 ? pika_true : pika_false;
}
void threading_RLock_release(PikaObj* self) {
pika_platform_thread_rtmutex_t* rm = obj_getPtr(self, "_rmutex_");
pika_platform_thread_rtmutex_unlock(rm);
}

View File

@ -0,0 +1,52 @@
import _thread
from PikaObj import *
def get_ident() ->int64: ...
class Lock():
def __init__(self): # real signature unknown
self._mutex_ = None
def acquire(self, block:bool, timeout:any) -> bool: ... # real signature unknown; restored from __doc__
def locked(self) -> bool: ...# real signature unknown; restored from __doc__
def release(self): ... # real signature unknown; restored from __doc__
def __del__(self): ...
class RLock():
def __init__(self): # real signature unknown
self._rmutex_ = None
def acquire(self, block:bool, timeout:any) -> bool: ...# real signature unknown; restored from __doc__
def locked(self) -> bool: ... # real signature unknown; restored from __doc__
def release(self): ...# real signature unknown; restored from __doc__
def __del__(self): ...
class Condition:
def __init__(self): # real signature unknown
self._cond_ = None
self._rtmetux_ = None
def wait(self, timeout:any) -> bool: ...
def notify(self, n:int): ...
def notify_all(self): ...
def _is_owned(self) -> bool: ...
def __del__(self): ...
def acquire(self, block:bool, timeout:any) -> bool: ...
def release(self): ...

View File

@ -1373,4 +1373,6 @@ TEST(jrpc, exec_concat_str_space) {
free(response);
}
TEST_RUN_SINGLE_FILE(threading, lock_rlock, "test/python/threading/lock_rlock.py")
TEST_END

View File

@ -0,0 +1,83 @@
import _thread
import time
import threading
import PikaStdLib
# 共享资源
shared_resource = 0
# 互斥锁
mutex = threading.Lock()
# 线程函数
finished = 0
def thread_function(name, delay):
global shared_resource
global mutex, finished
print("delay : %s" % str(delay))
k = 0
i = 0
mem = PikaStdLib.MemChecker()
for i in range(5):
# while 1:
try:
# 获取互斥锁
print("%s try to acquire lock. #1" % name)
res = mutex.acquire(True, None)
print("res: %s" % str(res))
if 1: # 测试RLock或者Lock的超时加上
print("%s try to acquire lock. #2" % name)
res = mutex.acquire(True, 0.5)
print("res: %s" % str(res))
if res:
print("%s acquire lock SUCC." % name)
else:
print("%s acquire lock FAIL." % name)
# 打印当前线程名称和共享资源的值
print("Thread %s: Iteration %d, Shared Resource: %d" %
(name, i, shared_resource))
# 更新共享资源
shared_resource += 1
# 模拟工作时间
time.sleep(delay)
print("wake")
# 释放互斥锁
mutex.release()
mutex.release()
k += 1
print("%s i = %d." % (name, i))
# print('mem used now:')
# mem.now()
except:
print("------------- error ---------------")
print("%s exit , at last, i = %d." % (name, k))
finished += 1
# 主函数
def main():
# 创建第一个线程
_thread.start_new_thread(thread_function, ("Thread-1", 0.1))
time.sleep(0.5)
# 创建第二个线程
_thread.start_new_thread(thread_function, ("Thread-2", 0.2))
# 主线程等待子线程结束
# 由于 _thread 没有 join 方法,我们通过 sleep 来模拟等待
# time.sleep(60)
while finished < 2:
time.sleep(1)
main()