.. _kernel_fifo_lifo: 数据传递-FIFO/LIFO #################### API --- 相似API ~~~~~~~ FIFO和LIFO API都是用宏包装queue的API实现,它们有一组类似的发送接收API,这里放在一起介绍 **#define k_fifo_init(fifo)** **#define k_lifo_init(lifo)** 作用:初始化一个struct k_fifo/strut k_lifo fifo/lifo: struct k_fifo/strut k_lifo **#define k_fifo_put(fifo, data)** **#define k_lifo_put(lifo, data)** 作用:将数据放入fifo/lifo fifo/lifo:数据要放入的fifo/lifo data: 要放入fifo/lifo的数据 **#define k_fifo_alloc_put(fifo, data)** **#define k_lifo_alloc_put(lifo, data)** 作用:fifo/lifo分配一个空间,将数据放入 fifo/lifo:数据要放入的fifo/lifo data: 要放入fifo/lifo的数据 **#define k_fifo_get(fifo, timeout)** **#define k_lifo_get(lifo,timeout)** 作用:从fifo/lifo读出数据 fifo/lifo:要读的fifo/lifo data: 读出的数据 FIFO特殊API ~~~~~~~~~~~ FIFO比LIFO多一些API **#define k_fifo_cancel_wait(fifo)** 作用:放弃等待,让第一个等待的fifo的thread退出pending,k_fifo_get的数据将是NULL fifo: 操作的fifo **#define k_fifo_put_list(fifo, head, tail)** 作用:将一个单链表加入到fifo中 fifo: 操作的fifo head: 单链表的第一个节点 tail: 单链表的最后一个节点 **#define k_fifo_put_slist(fifo, list)** 作用:将一个单链表加入到fifo中,这里单链表对象为sys_slist_t fifo: 操作的fifo list: sys_slist_t 单链表 **#define k_fifo_is_empty(fifo)** 作用:检查fifo是否为空 fifo: 要检查的fifo **#define k_fifo_peek_head(fifo)** 作用:peek fifo头部上的节点(下个将会被read的数据),但不会将数据从fifo删除 fifo: 被peek的fifo 返回:返回peek的数据 **#define k_fifo_peek_tail(fifo)** 作用: peek fifo尾部上的节点(FIFO中最后进入的数据),但不会将数据从fifo删除 fifo: 被peek的fifo 返回:返回peek的数据 使用说明 -------- 可以在ISR中put fifo/lifo.也可在ISR内get fifo/fifo,但不能等待。 fifo/lifo不限制数据项的多少。 初始化 ~~~~~~ 经过初始化的fifo/lifo才能被使用,下面两种方法是一样的,显示定义 :: struct k_fifo my_fifo; struct k_lifo my_lifo; k_fifo_init(&my_fifo); k_lifo_init(&my_lifo); 隐式定义: :: K_FIFO_DEFINE(my_fifo); K_LIFO_DEFINE(my_lifo); 写数据 ~~~~~~ 这里用fifo演示put,用Lifo演示alloc_put :: //使用put发送数据,最开始的word必须保留,用于存放queue,这也就是为什么要求4对其的原因 //使用alloc_put没有这个限制 struct data_item_t { void *fifo_reserved; /* 1st word reserved for use by fifo */ ... }; struct data_item_t fifo_tx_data; char lifo_tx_data[32]; void producer_thread(int unused1, int unused2, int unused3) { while (1) { /* create data item to send */ //fifo_tx_data使用的是put,因此最开始的4个字节不能使用 fifo_tx_data = ... lifo_tx_data = ... /* send data to consumers */ k_fifo_put(&my_fifo, &fifo_tx_data); //alloc_put时,queue会从线程池中分配出内存存放lifo,所以不需要保留开始4个字节 //在get时,queue会自动将其释放掉 k_lifo_alloc_put(&my_lifo, &lifo_tx_data); ... } } 读数据 ~~~~~~ :: void consumer_fifo_thread(int unused1, int unused2, int unused3) { struct data_item_t *rx_data; while (1) { rx_data = k_fifo_get(&my_fifo, K_FOREVER); /* process fifo data item */ ... } } void consumer_lifo_thread(int unused1, int unused2, int unused3) { struct data_item_t *rx_data; while (1) { rx_data = k_lifo_get(&my_lifo, K_FOREVER); /* process lifo data item */ ... } } 实现 ==== 前面已经提到过fifo/lifo都是使用queue实现,fifo/lifo是直接包queue的API,每一个fifo/lifo API都对应一个queue的API,可以说fifo/lifo就是queue的使用方法不一样,先看结构体就可以看出使用的就是queue FIFO/LIFO ----------- :: struct k_fifo { struct k_queue _queue; }; struct k_lifo { struct k_queue _queue; }; 每个fifo/lifo都有一个对应的queue API,这里列出来,更详细的可以看include/kernel.h,下面斜体加粗的就是fifo/lifo区别所在 k_fifo_init->k_queue_init k_fifo_cancel_wait->k_queue_cancel_wait **k_fifo_put->k_queue_append** **k_fifo_alloc_put->k_queue_alloc_append** k_fifo_put_list->k_queue_append_list k_fifo_put_slist->k_queue_merge_slist k_fifo_get->k_queue_get k_fifo_is_empty->k_queue_is_empty k_fifo_peek_head->k_queue_peek_head k_fifo_peek_tail->k_queue_peek_tail k_lifo_init->k_queue_init **k_lifo_put->k_queue_prepend** **k_lifo_alloc_put->k_queue_alloc_prepend** k_lifo_get->k_queue_get Queue ----- 从上一节可以看出FIFO就是用queue实现的,因此我们继续分析queue Queue初始化 ~~~~~~~~~~~ 初始化:建立一个wait_q,建立一个单项标志链表 添加数据到queue ~~~~~~~~~~~~~~~ 添加数据到queue基本都是通过queue_insert完成 k_queue_append->queue_insert: 在链表尾部插入 k_queue_insert->queue_insert: 在指定节点后插入 k_queue_prepend->queue_insert:在链表头部插入 z_impl_k_queue_alloc_append->queue_insert:从thread pool中分配节点,并插入到尾部 z_impl_k_queue_alloc_prepend->queue_insert:从thread pool中分配节点,并插入到头部 z_impl_k_queue_peek_head -> z_queue_node_peek: 从链表中读取头,但不删除该节点 z_impl_k_queue_peek_tail -> z_queue_node_peek: 从链表中读取尾,但不删除该节点 :: static s32_t queue_insert(struct k_queue *queue, void *prev, void *data, bool alloc) { k_spinlock_key_t key = k_spin_lock(&queue->lock); #if !defined(CONFIG_POLL) struct k_thread *first_pending_thread; //获取等待queue的thread,如果有的话,将数据提供给该thread,引发调度,直接返回 first_pending_thread = z_unpend_first_thread(&queue->wait_q); if (first_pending_thread != NULL) { prepare_thread_to_run(first_pending_thread, data); z_reschedule(&queue->lock, key); return 0; } #endif /* !CONFIG_POLL */ /* Only need to actually allocate if no threads are pending */ if (alloc) { //要alloc节点空间 struct alloc_node *anode; //从线程池中alloc节点空间 //并初始化数据 anode = z_thread_malloc(sizeof(*anode)); if (anode == NULL) { k_spin_unlock(&queue->lock, key); return -ENOMEM; } anode->data = data; //初始化节点的时候,设置标志为1,表示是alloc的,在get的时候就需要释放 sys_sfnode_init(&anode->node, 0x1); data = anode; } else { //如果data中自带节点,则无需alloc sys_sfnode_init(data, 0x0); } //将节点插入到指定节点之后 //prev如果为NULL,data节点会被插入到单链表的最开始 sys_sflist_insert(&queue->data_q, prev, data); //如果配置了poll,会通知queue条件已经满足 #if defined(CONFIG_POLL) handle_poll_events(queue, K_POLL_STATE_DATA_AVAILABLE); #endif /* CONFIG_POLL */ //引发调度 z_reschedule(&queue->lock, key); return 0; } 当传入参数data是净数据时(没有在data的最开始预留node位置),那么在insert的使用需要用alloc指定要分配节点 从Queue中读取数据 ~~~~~~~~~~~~~~~~~ 使用k_queue_get->z_impl_k_queue_get从queue中读取数据,读取数据时数据从queue中删除,实际的删除动作就是从链表中将节点移除 :: void *z_impl_k_queue_get(struct k_queue *queue, s32_t timeout) { k_spinlock_key_t key = k_spin_lock(&queue->lock); void *data; if (likely(!sys_sflist_is_empty(&queue->data_q))) { sys_sfnode_t *node; //如果链表不为空,从链表中取出头节点 node = sys_sflist_get_not_empty(&queue->data_q); //从节点中取出数据,如果node是从线程池中分配的,将其释放 data = z_queue_node_peek(node, true); k_spin_unlock(&queue->lock, key); //取到数据后立即返回 return data; } if (timeout == K_NO_WAIT) { k_spin_unlock(&queue->lock, key); return NULL; } #if defined(CONFIG_POLL) k_spin_unlock(&queue->lock, key); return k_queue_poll(queue, timeout); #else //没有取到数据,进入等待 //可以结合发送看,如果有线程在等待queue数据,那么新进入queue的数据是不会进入链表的 int ret = z_pend_curr(&queue->lock, key, &queue->wait_q, timeout); return (ret != 0) ? NULL : _current->base.swap_data; #endif /* CONFIG_POLL */ } 从node中提取数据 ~~~~~~~~~~~~~~~~ 从链表中peek或者移出的node,需要使用下面API从node中提取数据 :: void *z_queue_node_peek(sys_sfnode_t *node, bool needs_free) { void *ret; if ((node != NULL) && (sys_sfnode_flags_get(node) != (u8_t)0)) { //检查到flag 不为0,说明加入链表的node是从线程池中分配 struct alloc_node *anode; //这里取出数据后释放node anode = CONTAINER_OF(node, struct alloc_node, node); ret = anode->data; if (needs_free) { k_free(anode); } } else { //不是从线程池分配的node,直接返回 ret = (void *)node; } return ret; } 如果是移出的node在Insert的时候是从线程池alloc出来的,那么在这里需要needs_free=true指定要free空间。如果是Peek的node数据时,则只用提取数据,例如: :: static inline void *z_impl_k_queue_peek_head(struct k_queue *queue) { return z_queue_node_peek(sys_sflist_peek_head(&queue->data_q), false); } static inline void *z_impl_k_queue_peek_tail(struct k_queue *queue) { return z_queue_node_peek(sys_sflist_peek_tail(&queue->data_q), false); } 放弃等待数据 ~~~~~~~~~~~~ k_queue_get的时候如果queue内没有数据,允许进行等待。等待期间可以使用k_queue_cancel_wait放弃等待 :: void z_impl_k_queue_cancel_wait(struct k_queue *queue) { k_spinlock_key_t key = k_spin_lock(&queue->lock); #if !defined(CONFIG_POLL) struct k_thread *first_pending_thread; //获取正在等待的第一个线程 first_pending_thread = z_unpend_first_thread(&queue->wait_q); //让该线程退出等待 if (first_pending_thread != NULL) { prepare_thread_to_run(first_pending_thread, NULL); } #else handle_poll_events(queue, K_POLL_STATE_CANCELLED); #endif /* !CONFIG_POLL */ //重调度 z_reschedule(&queue->lock, key); } 关于poll ~~~~~~~~ 一般情况下queue等待数据是通过让其thread 等待queue的wait_q实现,一旦配置poll后将不再使用该机制,而是通过poll,queue get使用k_queue_poll等待queue有数据,因此可能和其它thread等待poll条件发生冲突 :: static void *k_queue_poll(struct k_queue *queue, s32_t timeout) { struct k_poll_event event; int err, elapsed = 0, done = 0; k_spinlock_key_t key; void *val; u32_t start; //初始化要等待的条件 k_poll_event_init(&event, K_POLL_TYPE_FIFO_DATA_AVAILABLE, K_POLL_MODE_NOTIFY_ONLY, queue); if (timeout != K_FOREVER) { start = k_uptime_get_32(); } do { event.state = K_POLL_STATE_NOT_READY; //开始poll err = k_poll(&event, 1, timeout - elapsed); if (err && err != -EAGAIN) { return NULL; } //通知后去链表拿数据 key = k_spin_lock(&queue->lock); val = z_queue_node_peek(sys_sflist_get(&queue->data_q), true); k_spin_unlock(&queue->lock, key); //如果没拿到数据,说明可能被其它thread将数据拿走,需要重新等待 if ((val == NULL) && (timeout != K_FOREVER)) { elapsed = k_uptime_get_32() - start; done = elapsed > timeout; } } while (!val && !done); return val; } 插入链表数据 ~~~~~~~~~~~~ queue也提供2个API将数据以链表的形式插入到queue中,区别是k_queue_append_list插入后不会影响被插入链表,而k_queue_merge_slist插入链表后会把链表清空 :: int k_queue_append_list(struct k_queue *queue, void *head, void *tail) { /* invalid head or tail of list */ CHECKIF(head == NULL || tail == NULL) { return -EINVAL; } k_spinlock_key_t key = k_spin_lock(&queue->lock); #if !defined(CONFIG_POLL) struct k_thread *thread = NULL; //有等待的thread全部取出来消化这个链表 //注意,消化完后原来的链表还在 if (head != NULL) { thread = z_unpend_first_thread(&queue->wait_q); } while ((head != NULL) && (thread != NULL)) { prepare_thread_to_run(thread, head); head = *(void **)head; thread = z_unpend_first_thread(&queue->wait_q); } //没有消化完的加入到queue中 if (head != NULL) { sys_sflist_append_list(&queue->data_q, head, tail); } #else sys_sflist_append_list(&queue->data_q, head, tail); handle_poll_events(queue, K_POLL_STATE_DATA_AVAILABLE); #endif /* !CONFIG_POLL */ z_reschedule(&queue->lock, key); return 0; } :: int k_queue_merge_slist(struct k_queue *queue, sys_slist_t *list) { int ret; /* list must not be empty */ CHECKIF(sys_slist_is_empty(list)) { return -EINVAL; } //和k_queue_append_list功能一样 ret = k_queue_append_list(queue, list->head, list->tail); CHECKIF(ret != 0) { return ret; } //但消化完链表后,原有链表被清空 sys_slist_init(list); return 0; } 图例 ---- 下面用一张图简单说明FIFO/LIFO和queue操作之间的关系 |QUEUE| 可以看到FIFO是通过k_queue_appen将数据加入到queue的链表尾,LIFO通过k_queue_prepend将数据加入到queue的链表头,而FIFO/LIFO读数据都是用k_queue_get从链表头读数据,所以达到了FIFO先进先出,LIFO后进先出的目的。 参考 ==== https://docs.zephyrproject.org/latest/reference/kernel/data_passing/fifos.html https://docs.zephyrproject.org/latest/reference/kernel/data_passing/lifos.html .. |QUEUE| image:: ../../images/develop/kernel/queue.png