2023-12-07 22:22:42 +08:00

1087 lines
36 KiB
C

/*
* Copyright (c) 2020, Armink, <armink.ztl@gmail.com>
*
* SPDX-License-Identifier: Apache-2.0
*/
/**
* @file
* @brief TSDB feature.
*
* Time series log (like TSDB) feature implement source file.
*
* TSL is time series log, the TSDB saved many TSLs.
*/
#include <inttypes.h>
#include <string.h>
#include "flashdb.h"
#include "fdb_low_lvl.h"
#define FDB_LOG_TAG "[tsl]"
/* rewrite log prefix */
#undef FDB_LOG_PREFIX2
#define FDB_LOG_PREFIX2() \
FDB_PRINT("[%s][%s] ", db_name(db), _fdb_db_path((fdb_db_t)db))
#if defined(FDB_USING_TSDB)
#if (FDB_WRITE_GRAN == 64)
#error "Flash 64 bits write granularity is not supported in TSDB yet!"
#endif
/* magic word(`T`, `S`, `L`, `0`) */
#define SECTOR_MAGIC_WORD 0x304C5354
#define TSL_STATUS_TABLE_SIZE FDB_STATUS_TABLE_SIZE(FDB_TSL_STATUS_NUM)
#define SECTOR_HDR_DATA_SIZE (FDB_WG_ALIGN(sizeof(struct sector_hdr_data)))
#define LOG_IDX_DATA_SIZE (FDB_WG_ALIGN(sizeof(struct log_idx_data)))
#define LOG_IDX_TS_OFFSET ((unsigned long)(&((struct log_idx_data*)0)->time))
#define SECTOR_MAGIC_OFFSET \
((unsigned long)(&((struct sector_hdr_data*)0)->magic))
#define SECTOR_START_TIME_OFFSET \
((unsigned long)(&((struct sector_hdr_data*)0)->start_time))
#define SECTOR_END0_TIME_OFFSET \
((unsigned long)(&((struct sector_hdr_data*)0)->end_info[0].time))
#define SECTOR_END0_IDX_OFFSET \
((unsigned long)(&((struct sector_hdr_data*)0)->end_info[0].index))
#define SECTOR_END0_STATUS_OFFSET \
((unsigned long)(&((struct sector_hdr_data*)0)->end_info[0].status))
#define SECTOR_END1_TIME_OFFSET \
((unsigned long)(&((struct sector_hdr_data*)0)->end_info[1].time))
#define SECTOR_END1_IDX_OFFSET \
((unsigned long)(&((struct sector_hdr_data*)0)->end_info[1].index))
#define SECTOR_END1_STATUS_OFFSET \
((unsigned long)(&((struct sector_hdr_data*)0)->end_info[1].status))
/* the next address is get failed */
#define FAILED_ADDR 0xFFFFFFFF
#define db_name(db) (((fdb_db_t)db)->name)
#define db_init_ok(db) (((fdb_db_t)db)->init_ok)
#define db_sec_size(db) (((fdb_db_t)db)->sec_size)
#define db_max_size(db) (((fdb_db_t)db)->max_size)
#define db_oldest_addr(db) (((fdb_db_t)db)->oldest_addr)
#define db_lock(db) \
do { \
if (((fdb_db_t)db)->lock) \
((fdb_db_t)db)->lock((fdb_db_t)db); \
} while (0);
#define db_unlock(db) \
do { \
if (((fdb_db_t)db)->unlock) \
((fdb_db_t)db)->unlock((fdb_db_t)db); \
} while (0);
#define _FDB_WRITE_STATUS(db, addr, status_table, status_num, status_index, \
sync) \
do { \
result = _fdb_write_status((fdb_db_t)db, addr, status_table, \
status_num, status_index, sync); \
if (result != FDB_NO_ERR) \
return result; \
} while (0);
#define FLASH_WRITE(db, addr, buf, size, sync) \
do { \
result = _fdb_flash_write((fdb_db_t)db, addr, buf, size, sync); \
if (result != FDB_NO_ERR) \
return result; \
} while (0);
struct sector_hdr_data {
uint8_t status[FDB_STORE_STATUS_TABLE_SIZE]; /**< sector store status @see
fdb_sector_store_status_t */
uint32_t magic; /**< magic word(`T`, `S`, `L`, `0`) */
fdb_time_t start_time; /**< the first start node's timestamp */
struct {
fdb_time_t time; /**< the last end node's timestamp */
uint32_t index; /**< the last end node's index */
uint8_t status[TSL_STATUS_TABLE_SIZE]; /**< end node status, @see
fdb_tsl_status_t */
} end_info[2];
uint32_t reserved;
};
typedef struct sector_hdr_data* sector_hdr_data_t;
/* time series log node index data */
struct log_idx_data {
uint8_t status_table[TSL_STATUS_TABLE_SIZE]; /**< node status, @see
fdb_tsl_status_t */
fdb_time_t time; /**< node timestamp */
uint32_t log_len; /**< node total length (header + name + value), must align
by FDB_WRITE_GRAN */
uint32_t log_addr; /**< node address */
};
typedef struct log_idx_data* log_idx_data_t;
struct query_count_args {
fdb_tsl_status_t status;
size_t count;
};
struct check_sec_hdr_cb_args {
fdb_tsdb_t db;
pika_bool check_failed;
size_t empty_num;
uint32_t empty_addr;
};
static fdb_err_t read_tsl(fdb_tsdb_t db, fdb_tsl_t tsl) {
struct log_idx_data idx;
/* read TSL index raw data */
_fdb_flash_read((fdb_db_t)db, tsl->addr.index, (uint32_t*)&idx,
sizeof(struct log_idx_data));
tsl->status =
(fdb_tsl_status_t)_fdb_get_status(idx.status_table, FDB_TSL_STATUS_NUM);
if ((tsl->status == FDB_TSL_PRE_WRITE) || (tsl->status == FDB_TSL_UNUSED)) {
tsl->log_len = db->max_len;
tsl->addr.log = FDB_DATA_UNUSED;
tsl->time = 0;
} else {
tsl->log_len = idx.log_len;
tsl->addr.log = idx.log_addr;
tsl->time = idx.time;
}
return FDB_NO_ERR;
}
static uint32_t get_next_sector_addr(fdb_tsdb_t db,
tsdb_sec_info_t pre_sec,
uint32_t traversed_len) {
if (traversed_len + db_sec_size(db) <= db_max_size(db)) {
if (pre_sec->addr + db_sec_size(db) < db_max_size(db)) {
return pre_sec->addr + db_sec_size(db);
} else {
/* the next sector is on the top of the database */
return 0;
}
} else {
/* finished */
return FAILED_ADDR;
}
}
static uint32_t get_next_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl) {
uint32_t addr = FAILED_ADDR;
if (sector->status == FDB_SECTOR_STORE_EMPTY) {
return FAILED_ADDR;
}
if (pre_tsl->addr.index + LOG_IDX_DATA_SIZE <= sector->end_idx) {
addr = pre_tsl->addr.index + LOG_IDX_DATA_SIZE;
} else {
/* no TSL */
return FAILED_ADDR;
}
return addr;
}
static uint32_t get_last_tsl_addr(tsdb_sec_info_t sector, fdb_tsl_t pre_tsl) {
uint32_t addr = FAILED_ADDR;
if (sector->status == FDB_SECTOR_STORE_EMPTY) {
return FAILED_ADDR;
}
if (pre_tsl->addr.index >=
(sector->addr + SECTOR_HDR_DATA_SIZE + LOG_IDX_DATA_SIZE)) {
addr = pre_tsl->addr.index - LOG_IDX_DATA_SIZE;
} else {
return FAILED_ADDR;
}
return addr;
}
static uint32_t get_last_sector_addr(fdb_tsdb_t db,
tsdb_sec_info_t pre_sec,
uint32_t traversed_len) {
if (traversed_len + db_sec_size(db) <= db_max_size(db)) {
if (pre_sec->addr >= db_sec_size(db)) {
/* the next sector is previous sector */
return pre_sec->addr - db_sec_size(db);
} else {
/* the next sector is the last sector */
return db_max_size(db) - db_sec_size(db);
}
} else {
return FAILED_ADDR;
}
}
static fdb_err_t read_sector_info(fdb_tsdb_t db,
uint32_t addr,
tsdb_sec_info_t sector,
pika_bool traversal) {
fdb_err_t result = FDB_NO_ERR;
struct sector_hdr_data sec_hdr;
FDB_ASSERT(sector);
/* read sector header raw data */
_fdb_flash_read((fdb_db_t)db, addr, (uint32_t*)&sec_hdr,
sizeof(struct sector_hdr_data));
sector->addr = addr;
sector->magic = sec_hdr.magic;
/* check magic word */
if (sector->magic != SECTOR_MAGIC_WORD) {
sector->check_ok = pika_false;
return FDB_INIT_FAILED;
}
sector->check_ok = pika_true;
sector->status = (fdb_sector_store_status_t)_fdb_get_status(
sec_hdr.status, FDB_SECTOR_STORE_STATUS_NUM);
sector->start_time = sec_hdr.start_time;
sector->end_info_stat[0] = (fdb_tsl_status_t)_fdb_get_status(
sec_hdr.end_info[0].status, FDB_TSL_STATUS_NUM);
sector->end_info_stat[1] = (fdb_tsl_status_t)_fdb_get_status(
sec_hdr.end_info[1].status, FDB_TSL_STATUS_NUM);
if (sector->end_info_stat[0] == FDB_TSL_WRITE) {
sector->end_time = sec_hdr.end_info[0].time;
sector->end_idx = sec_hdr.end_info[0].index;
} else if (sector->end_info_stat[1] == FDB_TSL_WRITE) {
sector->end_time = sec_hdr.end_info[1].time;
sector->end_idx = sec_hdr.end_info[1].index;
} else if (sector->end_info_stat[0] == FDB_TSL_PRE_WRITE &&
sector->end_info_stat[1] == FDB_TSL_PRE_WRITE) {
// TODO There is no valid end node info on this sector, need impl fast
// query this sector by fdb_tsl_iter_by_time
FDB_ASSERT(0);
}
/* traversal all TSL and calculate the remain space size */
sector->empty_idx = sector->addr + SECTOR_HDR_DATA_SIZE;
sector->empty_data = sector->addr + db_sec_size(db);
/* the TSL's data is saved from sector bottom, and the TSL's index saved
* from the sector top */
sector->remain = sector->empty_data - sector->empty_idx;
if (sector->status == FDB_SECTOR_STORE_USING && traversal) {
struct fdb_tsl tsl;
tsl.addr.index = sector->empty_idx;
while (read_tsl(db, &tsl) == FDB_NO_ERR) {
if (tsl.status == FDB_TSL_UNUSED) {
break;
}
sector->end_time = tsl.time;
sector->end_idx = tsl.addr.index;
sector->empty_idx += LOG_IDX_DATA_SIZE;
sector->empty_data -= FDB_WG_ALIGN(tsl.log_len);
tsl.addr.index += LOG_IDX_DATA_SIZE;
if (sector->remain >
LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(tsl.log_len)) {
sector->remain -=
(LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(tsl.log_len));
} else {
FDB_INFO("Error: this TSL (0x%08" PRIX32 ") size (%" PRIu32
") is out of bound.\n",
tsl.addr.index, tsl.log_len);
sector->remain = 0;
result = FDB_READ_ERR;
break;
}
}
}
return result;
}
static fdb_err_t format_sector(fdb_tsdb_t db, uint32_t addr) {
fdb_err_t result = FDB_NO_ERR;
struct sector_hdr_data sec_hdr;
FDB_ASSERT(addr % db_sec_size(db) == 0);
result = _fdb_flash_erase((fdb_db_t)db, addr, db_sec_size(db));
if (result == FDB_NO_ERR) {
_FDB_WRITE_STATUS(db, addr, sec_hdr.status, FDB_SECTOR_STORE_STATUS_NUM,
FDB_SECTOR_STORE_EMPTY, pika_true);
/* set the magic */
sec_hdr.magic = SECTOR_MAGIC_WORD;
FLASH_WRITE(db, addr + SECTOR_MAGIC_OFFSET, &sec_hdr.magic,
sizeof(sec_hdr.magic), pika_true);
}
return result;
}
static void sector_iterator(fdb_tsdb_t db,
tsdb_sec_info_t sector,
fdb_sector_store_status_t status,
void* arg1,
void* arg2,
pika_bool (*callback)(tsdb_sec_info_t sector,
void* arg1,
void* arg2),
pika_bool traversal) {
uint32_t sec_addr = sector->addr, traversed_len = 0;
/* search all sectors */
do {
read_sector_info(db, sec_addr, sector, pika_false);
if (status == FDB_SECTOR_STORE_UNUSED || status == sector->status) {
if (traversal) {
read_sector_info(db, sec_addr, sector, pika_true);
}
/* iterator is interrupted when callback return pika_true */
if (callback && callback(sector, arg1, arg2)) {
return;
}
}
traversed_len += db_sec_size(db);
} while ((sec_addr = get_next_sector_addr(db, sector, traversed_len)) !=
FAILED_ADDR);
}
static fdb_err_t write_tsl(fdb_tsdb_t db, fdb_blob_t blob, fdb_time_t time) {
fdb_err_t result = FDB_NO_ERR;
struct log_idx_data idx;
uint32_t idx_addr = db->cur_sec.empty_idx;
idx.log_len = blob->size;
idx.time = time;
idx.log_addr = db->cur_sec.empty_data - FDB_WG_ALIGN(idx.log_len);
/* write the status will by write granularity */
_FDB_WRITE_STATUS(db, idx_addr, idx.status_table, FDB_TSL_STATUS_NUM,
FDB_TSL_PRE_WRITE, pika_false);
/* write other index info */
FLASH_WRITE(db, idx_addr + LOG_IDX_TS_OFFSET, &idx.time,
sizeof(struct log_idx_data) - LOG_IDX_TS_OFFSET, pika_false);
/* write blob data */
FLASH_WRITE(db, idx.log_addr, blob->buf, blob->size, pika_false);
/* write the status will by write granularity */
_FDB_WRITE_STATUS(db, idx_addr, idx.status_table, FDB_TSL_STATUS_NUM,
FDB_TSL_WRITE, pika_true);
return result;
}
static fdb_err_t update_sec_status(fdb_tsdb_t db,
tsdb_sec_info_t sector,
fdb_blob_t blob,
fdb_time_t cur_time) {
fdb_err_t result = FDB_NO_ERR;
uint8_t status[FDB_STORE_STATUS_TABLE_SIZE];
if (sector->status == FDB_SECTOR_STORE_USING &&
sector->remain < LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(blob->size)) {
uint8_t end_status[TSL_STATUS_TABLE_SIZE];
uint32_t end_index = sector->empty_idx - LOG_IDX_DATA_SIZE,
new_sec_addr, cur_sec_addr = sector->addr;
/* save the end node index and timestamp */
if (sector->end_info_stat[0] == FDB_TSL_UNUSED) {
_FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END0_STATUS_OFFSET,
end_status, FDB_TSL_STATUS_NUM, FDB_TSL_PRE_WRITE,
pika_false);
FLASH_WRITE(db, cur_sec_addr + SECTOR_END0_TIME_OFFSET,
(uint32_t*)&db->last_time, sizeof(fdb_time_t),
pika_false);
FLASH_WRITE(db, cur_sec_addr + SECTOR_END0_IDX_OFFSET, &end_index,
sizeof(end_index), pika_false);
_FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END0_STATUS_OFFSET,
end_status, FDB_TSL_STATUS_NUM, FDB_TSL_WRITE,
pika_true);
} else if (sector->end_info_stat[1] == FDB_TSL_UNUSED) {
_FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END1_STATUS_OFFSET,
end_status, FDB_TSL_STATUS_NUM, FDB_TSL_PRE_WRITE,
pika_false);
FLASH_WRITE(db, cur_sec_addr + SECTOR_END1_TIME_OFFSET,
(uint32_t*)&db->last_time, sizeof(fdb_time_t),
pika_false);
FLASH_WRITE(db, cur_sec_addr + SECTOR_END1_IDX_OFFSET, &end_index,
sizeof(end_index), pika_false);
_FDB_WRITE_STATUS(db, cur_sec_addr + SECTOR_END1_STATUS_OFFSET,
end_status, FDB_TSL_STATUS_NUM, FDB_TSL_WRITE,
pika_true);
}
/* change current sector to full */
_FDB_WRITE_STATUS(db, cur_sec_addr, status, FDB_SECTOR_STORE_STATUS_NUM,
FDB_SECTOR_STORE_FULL, pika_true);
sector->status = FDB_SECTOR_STORE_FULL;
/* calculate next sector address */
if (sector->addr + db_sec_size(db) < db_max_size(db)) {
new_sec_addr = sector->addr + db_sec_size(db);
} else if (db->rollover) {
new_sec_addr = 0;
} else {
/* not rollover */
return FDB_SAVED_FULL;
}
read_sector_info(db, new_sec_addr, &db->cur_sec, pika_false);
if (sector->status != FDB_SECTOR_STORE_EMPTY) {
/* calculate the oldest sector address */
if (new_sec_addr + db_sec_size(db) < db_max_size(db)) {
db_oldest_addr(db) = new_sec_addr + db_sec_size(db);
} else {
db_oldest_addr(db) = 0;
}
format_sector(db, new_sec_addr);
read_sector_info(db, new_sec_addr, &db->cur_sec, pika_false);
}
} else if (sector->status == FDB_SECTOR_STORE_FULL) {
/* database full */
return FDB_SAVED_FULL;
}
if (sector->status == FDB_SECTOR_STORE_EMPTY) {
/* change the sector to using */
sector->status = FDB_SECTOR_STORE_USING;
sector->start_time = cur_time;
_FDB_WRITE_STATUS(db, sector->addr, status, FDB_SECTOR_STORE_STATUS_NUM,
FDB_SECTOR_STORE_USING, pika_true);
/* save the start timestamp */
FLASH_WRITE(db, sector->addr + SECTOR_START_TIME_OFFSET,
(uint32_t*)&cur_time, sizeof(fdb_time_t), pika_true);
}
return result;
}
static fdb_err_t tsl_append(fdb_tsdb_t db, fdb_blob_t blob) {
fdb_err_t result = FDB_NO_ERR;
fdb_time_t cur_time = db->get_time();
FDB_ASSERT(blob->size <= db->max_len);
/* check the current timestamp, MUST more than the last save timestamp */
if (cur_time <= db->last_time) {
FDB_INFO("Warning: current timestamp (%" PRIdMAX
") is less than or equal to the last save timestamp (%" PRIdMAX
"). This tsl will be dropped.\n",
(intmax_t)cur_time, (intmax_t)(db->last_time));
return FDB_WRITE_ERR;
}
result = update_sec_status(db, &db->cur_sec, blob, cur_time);
if (result != FDB_NO_ERR) {
return result;
}
/* write the TSL node */
result = write_tsl(db, blob, cur_time);
if (result != FDB_NO_ERR) {
return result;
}
/* recalculate the current using sector info */
db->cur_sec.end_idx = db->cur_sec.empty_idx;
db->cur_sec.end_time = cur_time;
db->cur_sec.empty_idx += LOG_IDX_DATA_SIZE;
db->cur_sec.empty_data -= FDB_WG_ALIGN(blob->size);
db->cur_sec.remain -= LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(blob->size);
db->last_time = cur_time;
return result;
}
/**
* Append a new log to TSDB.
*
* @param db database object
* @param blob log blob data
*
* @return result
*/
fdb_err_t fdb_tsl_append(fdb_tsdb_t db, fdb_blob_t blob) {
fdb_err_t result = FDB_NO_ERR;
if (!db_init_ok(db)) {
FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
return FDB_INIT_FAILED;
}
db_lock(db);
result = tsl_append(db, blob);
db_unlock(db);
return result;
}
/**
* The TSDB iterator for each TSL.
*
* @param db database object
* @param cb callback
* @param arg callback argument
*/
void fdb_tsl_iter(fdb_tsdb_t db, fdb_tsl_cb cb, void* arg) {
struct tsdb_sec_info sector;
uint32_t sec_addr, traversed_len = 0;
struct fdb_tsl tsl;
if (!db_init_ok(db)) {
FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
}
if (cb == NULL) {
return;
}
sec_addr = db_oldest_addr(db);
db_lock(db);
/* search all sectors */
do {
traversed_len += db_sec_size(db);
if (read_sector_info(db, sec_addr, &sector, pika_false) != FDB_NO_ERR) {
continue;
}
/* sector has TSL */
if (sector.status == FDB_SECTOR_STORE_USING ||
sector.status == FDB_SECTOR_STORE_FULL) {
if (sector.status == FDB_SECTOR_STORE_USING) {
/* copy the current using sector status */
sector = db->cur_sec;
}
tsl.addr.index = sector.addr + SECTOR_HDR_DATA_SIZE;
/* search all TSL */
do {
read_tsl(db, &tsl);
/* iterator is interrupted when callback return pika_true */
if (cb(&tsl, arg)) {
db_unlock(db);
return;
}
} while ((tsl.addr.index = get_next_tsl_addr(&sector, &tsl)) !=
FAILED_ADDR);
}
} while ((sec_addr = get_next_sector_addr(db, &sector, traversed_len)) !=
FAILED_ADDR);
db_unlock(db);
}
/**
* The TSDB iterator for each TSL.
*
* @param db database object
* @param cb callback
* @param arg callback argument
*/
void fdb_tsl_iter_reverse(fdb_tsdb_t db, fdb_tsl_cb cb, void* cb_arg) {
struct tsdb_sec_info sector;
uint32_t sec_addr, traversed_len = 0;
struct fdb_tsl tsl;
if (!db_init_ok(db)) {
FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
}
if (cb == NULL) {
return;
}
sec_addr = db->cur_sec.addr;
db_lock(db);
/* search all sectors */
do {
traversed_len += db_sec_size(db);
if (read_sector_info(db, sec_addr, &sector, pika_false) != FDB_NO_ERR) {
continue;
}
/* sector has TSL */
if (sector.status == FDB_SECTOR_STORE_USING ||
sector.status == FDB_SECTOR_STORE_FULL) {
if (sector.status == FDB_SECTOR_STORE_USING) {
/* copy the current using sector status */
sector = db->cur_sec;
}
tsl.addr.index = sector.end_idx;
/* search all TSL */
do {
read_tsl(db, &tsl);
/* iterator is interrupted when callback return pika_true */
if (cb(&tsl, cb_arg)) {
goto __exit;
}
} while ((tsl.addr.index = get_last_tsl_addr(&sector, &tsl)) !=
FAILED_ADDR);
} else if (sector.status == FDB_SECTOR_STORE_EMPTY ||
sector.status == FDB_SECTOR_STORE_UNUSED)
goto __exit;
} while ((sec_addr = get_last_sector_addr(db, &sector, traversed_len)) !=
FAILED_ADDR);
__exit:
db_unlock(db);
}
/*
* Found the matched TSL address.
*/
static int search_start_tsl_addr(fdb_tsdb_t db,
int start,
int end,
fdb_time_t from,
fdb_time_t to) {
struct fdb_tsl tsl;
while (pika_true) {
tsl.addr.index =
start + FDB_ALIGN((end - start) / 2, LOG_IDX_DATA_SIZE);
read_tsl(db, &tsl);
if (tsl.time < from) {
start = tsl.addr.index + LOG_IDX_DATA_SIZE;
} else if (tsl.time > from) {
end = tsl.addr.index - LOG_IDX_DATA_SIZE;
} else {
return tsl.addr.index;
}
if (start > end) {
if (from > to) {
tsl.addr.index = start;
read_tsl(db, &tsl);
if (tsl.time > from) {
start -= LOG_IDX_DATA_SIZE;
}
}
break;
}
}
return start;
}
/**
* The TSDB iterator for each TSL by timestamp.
*
* @param db database object
* @param from starting timestamp. It will be a reverse iterator when ending
* timestamp less than starting timestamp
* @param to ending timestamp
* @param cb callback
* @param arg callback argument
*/
void fdb_tsl_iter_by_time(fdb_tsdb_t db,
fdb_time_t from,
fdb_time_t to,
fdb_tsl_cb cb,
void* cb_arg) {
struct tsdb_sec_info sector;
uint32_t sec_addr, start_addr, traversed_len = 0;
struct fdb_tsl tsl;
pika_bool found_start_tsl = pika_false;
uint32_t (*get_sector_addr)(fdb_tsdb_t, tsdb_sec_info_t, uint32_t);
uint32_t (*get_tsl_addr)(tsdb_sec_info_t, fdb_tsl_t);
if (!db_init_ok(db)) {
FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
}
if (from <= to) {
start_addr = db_oldest_addr(db);
get_sector_addr = get_next_sector_addr;
get_tsl_addr = get_next_tsl_addr;
} else {
start_addr = db->cur_sec.addr;
get_sector_addr = get_last_sector_addr;
get_tsl_addr = get_last_tsl_addr;
}
// FDB_INFO("from %s", ctime((const time_t * )&from));
// FDB_INFO("to %s", ctime((const time_t * )&to));
if (cb == NULL) {
return;
}
sec_addr = start_addr;
db_lock(db);
/* search all sectors */
do {
traversed_len += db_sec_size(db);
if (read_sector_info(db, sec_addr, &sector, pika_false) != FDB_NO_ERR) {
continue;
}
/* sector has TSL */
if ((sector.status == FDB_SECTOR_STORE_USING ||
sector.status == FDB_SECTOR_STORE_FULL)) {
if (sector.status == FDB_SECTOR_STORE_USING) {
/* copy the current using sector status */
sector = db->cur_sec;
}
if ((found_start_tsl) ||
(!found_start_tsl &&
((from <= to &&
((sec_addr == start_addr && from <= sector.start_time) ||
from <= sector.end_time)) ||
(from > to &&
((sec_addr == start_addr && from >= sector.end_time) ||
from >= sector.start_time))))) {
uint32_t start = sector.addr + SECTOR_HDR_DATA_SIZE,
end = sector.end_idx;
found_start_tsl = pika_true;
/* search the first start TSL address */
tsl.addr.index =
search_start_tsl_addr(db, start, end, from, to);
/* search all TSL */
do {
read_tsl(db, &tsl);
if (tsl.status != FDB_TSL_UNUSED) {
if ((from <= to && tsl.time >= from &&
tsl.time <= to) ||
(from > to && tsl.time <= from && tsl.time >= to)) {
/* iterator is interrupted when callback return
* pika_true
*/
if (cb(&tsl, cb_arg)) {
goto __exit;
}
} else {
goto __exit;
}
}
} while ((tsl.addr.index = get_tsl_addr(&sector, &tsl)) !=
FAILED_ADDR);
}
} else if (sector.status == FDB_SECTOR_STORE_EMPTY) {
goto __exit;
}
} while ((sec_addr = get_sector_addr(db, &sector, traversed_len)) !=
FAILED_ADDR);
__exit:
db_unlock(db);
}
static pika_bool query_count_cb(fdb_tsl_t tsl, void* arg) {
struct query_count_args* args = arg;
if (tsl->status == args->status) {
args->count++;
}
return pika_false;
}
/**
* Query some TSL's count by timestamp and status.
*
* @param db database object
* @param from starting timestamp
* @param to ending timestamp
* @param status status
*/
size_t fdb_tsl_query_count(fdb_tsdb_t db,
fdb_time_t from,
fdb_time_t to,
fdb_tsl_status_t status) {
struct query_count_args arg = {FDB_TSL_UNUSED, 0};
arg.status = status;
if (!db_init_ok(db)) {
FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));
return FDB_INIT_FAILED;
}
fdb_tsl_iter_by_time(db, from, to, query_count_cb, &arg);
return arg.count;
}
/**
* Set the TSL status.
*
* @param db database object
* @param tsl TSL object
* @param status status
*
* @return result
*/
fdb_err_t fdb_tsl_set_status(fdb_tsdb_t db,
fdb_tsl_t tsl,
fdb_tsl_status_t status) {
fdb_err_t result = FDB_NO_ERR;
uint8_t status_table[TSL_STATUS_TABLE_SIZE];
/* write the status will by write granularity */
_FDB_WRITE_STATUS(db, tsl->addr.index, status_table, FDB_TSL_STATUS_NUM,
status, pika_true);
return result;
}
/**
* Convert the TSL object to blob object
*
* @param tsl TSL object
* @param blob blob object
*
* @return new blob object
*/
fdb_blob_t fdb_tsl_to_blob(fdb_tsl_t tsl, fdb_blob_t blob) {
blob->saved.addr = tsl->addr.log;
blob->saved.meta_addr = tsl->addr.index;
blob->saved.len = tsl->log_len;
return blob;
}
static pika_bool check_sec_hdr_cb(tsdb_sec_info_t sector,
void* arg1,
void* arg2) {
struct check_sec_hdr_cb_args* arg = arg1;
fdb_tsdb_t db = arg->db;
if (!sector->check_ok) {
FDB_INFO("Sector (0x%08" PRIX32 ") header info is incorrect.\n",
sector->addr);
(arg->check_failed) = pika_true;
return pika_true;
} else if (sector->status == FDB_SECTOR_STORE_USING) {
if (db->cur_sec.addr == FDB_DATA_UNUSED) {
memcpy(&db->cur_sec, sector, sizeof(struct tsdb_sec_info));
} else {
FDB_INFO(
"Warning: Sector status is wrong, there are multiple sectors "
"in use.\n");
(arg->check_failed) = pika_true;
return pika_true;
}
} else if (sector->status == FDB_SECTOR_STORE_EMPTY) {
(arg->empty_num) += 1;
arg->empty_addr = sector->addr;
if ((arg->empty_num) == 1 && db->cur_sec.addr == FDB_DATA_UNUSED) {
memcpy(&db->cur_sec, sector, sizeof(struct tsdb_sec_info));
}
}
return pika_false;
}
static pika_bool format_all_cb(tsdb_sec_info_t sector, void* arg1, void* arg2) {
fdb_tsdb_t db = arg1;
format_sector(db, sector->addr);
return pika_false;
}
static void tsl_format_all(fdb_tsdb_t db) {
struct tsdb_sec_info sector;
sector.addr = 0;
sector_iterator(db, &sector, FDB_SECTOR_STORE_UNUSED, db, NULL,
format_all_cb, pika_false);
db_oldest_addr(db) = 0;
db->cur_sec.addr = 0;
db->last_time = 0;
/* read the current using sector info */
read_sector_info(db, db->cur_sec.addr, &db->cur_sec, pika_false);
FDB_INFO("All sector format finished.\n");
}
/**
* Clean all the data in the TSDB.
*
* @note It's DANGEROUS. This operation is not reversible.
*
* @param db database object
*/
void fdb_tsl_clean(fdb_tsdb_t db) {
db_lock(db);
tsl_format_all(db);
db_unlock(db);
}
/**
* This function will get or set some options of the database
*
* @param db database object
* @param cmd the control command
* @param arg the argument
*/
void fdb_tsdb_control(fdb_tsdb_t db, int cmd, void* arg) {
FDB_ASSERT(db);
switch (cmd) {
case FDB_TSDB_CTRL_SET_SEC_SIZE:
/* this change MUST before database initialization */
FDB_ASSERT(db->parent.init_ok == pika_false);
db->parent.sec_size = *(uint32_t*)arg;
break;
case FDB_TSDB_CTRL_GET_SEC_SIZE:
*(uint32_t*)arg = db->parent.sec_size;
break;
case FDB_TSDB_CTRL_SET_LOCK:
#if !defined(__ARMCC_VERSION) && defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
#endif
db->parent.lock = (void (*)(fdb_db_t db))arg;
#if !defined(__ARMCC_VERSION) && defined(__GNUC__)
#pragma GCC diagnostic pop
#endif
break;
case FDB_TSDB_CTRL_SET_UNLOCK:
#if !defined(__ARMCC_VERSION) && defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
#endif
db->parent.unlock = (void (*)(fdb_db_t db))arg;
#if !defined(__ARMCC_VERSION) && defined(__GNUC__)
#pragma GCC diagnostic pop
#endif
break;
case FDB_TSDB_CTRL_SET_ROLLOVER:
/* this change MUST after database initialized */
FDB_ASSERT(db->parent.init_ok == pika_true);
db->rollover = *(pika_bool*)arg;
break;
case FDB_TSDB_CTRL_GET_ROLLOVER:
*(pika_bool*)arg = db->rollover;
break;
case FDB_TSDB_CTRL_GET_LAST_TIME:
*(fdb_time_t*)arg = db->last_time;
break;
case FDB_TSDB_CTRL_SET_FILE_MODE:
#ifdef FDB_USING_FILE_MODE
/* this change MUST before database initialization */
FDB_ASSERT(db->parent.init_ok == pika_false);
db->parent.file_mode = *(pika_bool*)arg;
#else
FDB_INFO(
"Error: set file mode Failed. Please defined the "
"FDB_USING_FILE_MODE macro.");
#endif
break;
case FDB_TSDB_CTRL_SET_MAX_SIZE:
#ifdef FDB_USING_FILE_MODE
/* this change MUST before database initialization */
FDB_ASSERT(db->parent.init_ok == pika_false);
db->parent.max_size = *(uint32_t*)arg;
#endif
break;
case FDB_TSDB_CTRL_SET_NOT_FORMAT:
/* this change MUST before database initialization */
FDB_ASSERT(db->parent.init_ok == pika_false);
db->parent.not_formatable = *(pika_bool*)arg;
break;
}
}
/**
* The time series database initialization.
*
* @param db database object
* @param name database name
* @param path FAL mode: partition name, file mode: database saved directory
* path
* @param get_time get current time function
* @param max_len maximum length of each log
* @param user_data user data
*
* @return result
*/
fdb_err_t fdb_tsdb_init(fdb_tsdb_t db,
const char* name,
const char* path,
fdb_get_time get_time,
size_t max_len,
void* user_data) {
fdb_err_t result = FDB_NO_ERR;
struct tsdb_sec_info sector;
struct check_sec_hdr_cb_args check_sec_arg = {db, pika_false, 0, 0};
FDB_ASSERT(get_time);
result = _fdb_init_ex((fdb_db_t)db, name, path, FDB_DB_TYPE_TS, user_data);
if (result != FDB_NO_ERR) {
goto __exit;
}
db->get_time = get_time;
db->max_len = max_len;
/* default rollover flag is pika_true */
db->rollover = pika_true;
db_oldest_addr(db) = FDB_DATA_UNUSED;
db->cur_sec.addr = FDB_DATA_UNUSED;
/* must less than sector size */
FDB_ASSERT(max_len < db_sec_size(db));
/* check all sector header */
sector.addr = 0;
sector_iterator(db, &sector, FDB_SECTOR_STORE_UNUSED, &check_sec_arg, NULL,
check_sec_hdr_cb, pika_true);
/* format all sector when check failed */
if (check_sec_arg.check_failed) {
if (db->parent.not_formatable) {
result = FDB_READ_ERR;
goto __exit;
} else {
tsl_format_all(db);
}
} else {
uint32_t latest_addr;
if (check_sec_arg.empty_num > 0) {
latest_addr = check_sec_arg.empty_addr;
} else {
if (db->rollover) {
latest_addr = db->cur_sec.addr;
} else {
/* There is no empty sector. */
latest_addr = db->cur_sec.addr =
db_max_size(db) - db_sec_size(db);
}
}
/* db->cur_sec is the latest sector, and the next is the oldest sector
*/
if (latest_addr + db_sec_size(db) >= db_max_size(db)) {
/* db->cur_sec is the the bottom of the database */
db_oldest_addr(db) = 0;
} else {
db_oldest_addr(db) = latest_addr + db_sec_size(db);
}
}
FDB_DEBUG("TSDB (%s) oldest sectors is 0x%08" PRIX32
", current using sector is 0x%08" PRIX32 ".\n",
db_name(db), db_oldest_addr(db), db->cur_sec.addr);
/* read the current using sector info */
read_sector_info(db, db->cur_sec.addr, &db->cur_sec, pika_true);
/* get last save time */
if (db->cur_sec.status == FDB_SECTOR_STORE_USING) {
db->last_time = db->cur_sec.end_time;
} else if (db->cur_sec.status == FDB_SECTOR_STORE_EMPTY &&
db_oldest_addr(db) != db->cur_sec.addr) {
struct tsdb_sec_info sec;
uint32_t addr = db->cur_sec.addr;
if (addr == 0) {
addr = db_max_size(db) - db_sec_size(db);
} else {
addr -= db_sec_size(db);
}
read_sector_info(db, addr, &sec, pika_false);
db->last_time = sec.end_time;
}
__exit:
_fdb_init_finish((fdb_db_t)db, result);
return result;
}
/**
* The time series database deinitialization.
*
* @param db database object
*
* @return result
*/
fdb_err_t fdb_tsdb_deinit(fdb_tsdb_t db) {
_fdb_deinit((fdb_db_t)db);
return FDB_NO_ERR;
}
#endif /* defined(FDB_USING_TSDB) */