2024-07-03 00:08:43 +08:00

49 lines
1.3 KiB
Python

import _flashdb
import struct
KVDB_CTRL = _flashdb.KVDB_CTRL()
class KVDB(_flashdb.KVDB):
def get_blob(self, key, size):
res = super().get_blob(key, size)
if type(res) == list:
return bytes(res)
return None
def set_by_fmt(self, key, v, fmt):
if type(v) == list or type(v) == tuple:
blob = struct.pack(fmt, *v)
return super().set_blob(key, blob)
if type(v) == type(0):
blob = struct.pack(fmt, v)
return super().set_blob(key, blob)
def get_by_fmt(self, key, size, fmt):
res = super().get_blob(key, size)
if res is None:
return None
vs = struct.unpack(fmt, bytes(res))
if len(vs) == 1:
return vs[0]
return vs
class TSDB(_flashdb.TSDB):
def __init__(self, name: str, path: str, max_len: int = 1024,
user_data=None):
super().__init__(name, path, max_len, user_data)
def tsl_iter_by_time(self, from_time, to_time,
callback: any, user_data: any) -> int:
print('tsl_iter_by_time')
print('from_time:', from_time)
print('to_time:', to_time)
return super().tsl_iter_by_time(from_time, to_time, callback, user_data)
class TSL(_flashdb.TSL):
pass