ArtOS 快速开发框架

ArtOS 快速开发框架

首页
ArtOS框架
使用文档视频教程
Micropython
官方文档学习教程固件开发
发现
树莓派ESP32
生态圈
智能生活直播物联工业物联Node-RED
开发资源
开发板软件资源器件资源
关于ArtOS
登录 →
ArtOS 快速开发框架

ArtOS 快速开发框架

首页 ArtOS框架
使用文档视频教程
Micropython
官方文档学习教程固件开发
发现
树莓派ESP32
生态圈
智能生活直播物联工业物联Node-RED
开发资源
开发板软件资源器件资源
关于ArtOS
登录
  1. 首页
  2. ArtOS框架
  3. 开发文档
  4. Micropython 时序执行器 SequencerExecutor

Micropython 时序执行器 SequencerExecutor

0
  • 开发文档
  • 发布于 2026-01-29
  • 7 次阅读
极客熊
极客熊
import uasyncio as asyncio


class SeqAction:
    """
    SeqAction
    ==================================================
    时序动作单元(Sequencer Action)

    SequencerExecutor 的最小执行单位,用于描述:

        在 delay(ms) 延迟后,
        输出一个 value(电平/占空比/指令等)。

    典型用途:
        - LED 闪烁节奏(0/1)
        - 蜂鸣器鸣叫模式(on/off)
        - 电机脉冲序列(step pulse)
        - UI 动画状态序列

    属性:
        delay : int
            延迟时间(毫秒)
            - delay=0 表示立即执行
        value : any
            输出值,完全由 cb(value) 的实现决定

    MicroPython 优化:
        使用 __slots__ 避免动态属性字典,
        降低内存占用,适合 ESP32 等嵌入式环境。
    """
    __slots__ = ("delay", "value")

    def __init__(self, delay_ms: int, value):
        """
        :param delay_ms: 动作执行前延迟时间(ms)
        :param value: 动作输出值(由回调 cb(value) 消费)
        """
        self.delay = delay_ms
        self.value = value


class SequencerExecutor:
    """
    SequencerExecutor (ArtOS Enhanced)
    ==================================================
    ArtOS 时序序列执行器(输出节奏控制核心组件)

    用于以严格顺序执行一组 SeqAction:

        action1 -> action2 -> action3 -> ...

    每个动作包含:
        - 延迟 delay
        - 输出 value

    --------------------------------------------------
    设计目标(嵌入式工程约束)
    --------------------------------------------------
    1. 非阻塞执行:
       基于 uasyncio.create_task 调度,不阻塞主循环

    2. 安全停止:
       stop() cancel 任务后必进入 finally 恢复安全态

    3. 防止重入:
       busy-lock 保证同一时刻只有一个序列任务存活

    4. 生命周期可控:
       wait_stopped() 支持框架级 await 停止完成

    5. 安全态恢复:
       safe_value 用于退出后恢复 GPIO 默认电平

    --------------------------------------------------
    核心特性总结
    --------------------------------------------------
        - 顺序动作执行(delay + value)
        - repeat 次数控制(N 次 / 无限循环)
        - cancel 停止(正常退出路径)
        - safe_value 自动恢复
        - busy 防重入
        - await wait_stopped 生命周期管理
    """

    def __init__(self, cb, safe_value=None):
        """
        构造 SequencerExecutor

        :param cb: callable(value)
            输出回调函数(真正执行动作的接口)
            例如:
                cb = lambda v: pin.value(v)

        :param safe_value:
            安全恢复值(停止后自动输出)
            常用于:
                - LED 默认熄灭(0)
                - 蜂鸣器默认关闭(0)
                - 电机默认停止指令
        """
        self.cb = cb
        self.safe_value = safe_value

        # 动作序列缓存(SeqAction 列表)
        self.actions = []

        # repeat 执行次数:
        #   >0 表示执行 N 次
        #   -1 表示无限循环
        self.repeat = 1

        # 已完成的序列次数计数
        self.exec_count = 0

        # 当前运行标志(主循环条件)
        self.running = False

        # uasyncio Task 引用(用于 cancel)
        self._task = None

        # ==================================================
        # ArtOS Enhancement:防重入与生命周期控制
        # ==================================================

        # busy=True 表示仍有 Task 存活
        # start() 会拒绝在 busy 状态下再次启动
        self._busy = False

        # stopping=True 表示 stop() 已触发
        # 可用于未来扩展(区分自然结束 vs stop cancel)
        self._stopping = False

    # ==================================================
    # start:启动序列执行
    # ==================================================
    def start(self, actions, repeat=1):
        """
        启动序列执行(非阻塞)

        :param actions: list[SeqAction]
            动作序列(会浅拷贝,避免外部修改影响执行)

        :param repeat:
            >0 执行 N 次
            -1 无限循环
            0 不执行

        :return bool:
            True  启动成功
            False 启动失败(busy 或参数无效)
        """
        if not actions or repeat == 0:
            return False

        # 防止重入:
        # cancel 后旧任务未退出前,busy 仍为 True
        if self._busy:
            return False

        # 缓存动作序列(浅拷贝)
        self.actions = list(actions)

        # 初始化重复次数与计数器
        self.repeat = repeat
        self.exec_count = 0

        # 设置运行状态
        self.running = True
        self._busy = True
        self._stopping = False

        # 创建异步执行任务(立即返回)
        self._task = asyncio.create_task(self._run())
        return True

    # ==================================================
    # stop:请求停止(非阻塞)
    # ==================================================
    def stop(self):
        """
        请求停止序列执行(非阻塞)

        stop() 的职责:
            1. 设置 running=False
            2. cancel Task
            3. safe_value 恢复交给 finally

        注意:
            cancel() 并不会立即终止任务,
            Task 会在 finally 中完成清理。
        """
        if not self._busy:
            return

        # 标记停止状态
        self.running = False
        self._stopping = True

        # cancel 任务(触发 CancelledError)
        if self._task:
            self._task.cancel()

    # ==================================================
    # wait_stopped:等待完全退出
    # ==================================================
    async def wait_stopped(self):
        """
        等待 Sequencer 完全退出(框架生命周期管理)

        适用场景:
            stop() 后需要确保硬件已恢复 safe_value
            并且 Task 已彻底释放资源。

        实现方式:
            busy=False 由 finally 统一释放
        """
        while self._busy:
            await asyncio.sleep_ms(1)

    # ==================================================
    # _run:内部异步执行循环(核心)
    # ==================================================
    async def _run(self):
        """
        Sequencer 主执行协程

        执行逻辑:
            while running:
                执行完整 actions 序列
                exec_count += 1
                repeat 控制退出

        Cancel 路径:
            stop() -> task.cancel()
            -> raise CancelledError
            -> finally 执行安全恢复与状态释放
        """
        try:
            while self.running:

                # repeat 次数控制(无限循环 repeat=-1)
                if self.repeat != -1 and self.exec_count >= self.repeat:
                    break

                # 顺序执行动作列表
                for action in self.actions:

                    # stop() 后立即中断动作序列
                    if not self.running:
                        break

                    # 延迟执行(毫秒)
                    if action.delay > 0:
                        await asyncio.sleep_ms(action.delay)

                    # 输出动作值(由 cb 实现硬件控制)
                    self.cb(action.value)

                # 完成一次完整序列
                self.exec_count += 1

                # 防止 delay=0 无限循环空转占满 CPU
                await asyncio.sleep_ms(1)

        except asyncio.CancelledError:
            # cancel 属于正常退出路径
            pass

        finally:
            # ==================================================
            # 安全恢复(safe_value)
            # ==================================================
            if self.safe_value is not None:
                try:
                    self.cb(self.safe_value)
                except Exception:
                    # safe_value 恢复失败不应影响状态释放
                    pass

            # ==================================================
            # 状态清理(关键:释放 busy lock)
            # ==================================================
            self.running = False
            self._task = None
            self._busy = False
            self._stopping = False

相关文章
Micropython 指数退避执行器 BackoffExecutor

Micropython 指数退避执行器 BackoffExecutor

import uasyncio as asyncio import urandom class BackoffExecutor: """ BackoffExecutor —— 指数退避执行器(支持 Full Jitter) =========================

Micropython 时序执行器 SequencerExecutor

Micropython 时序执行器 SequencerExecutor

import uasyncio as asyncio class SeqAction: """ SeqAction ================================================== 时序动作单元(Sequencer Action

目录
  • 极客熊
  • 极客熊
  • 极客熊
Copyright © 2026 ArtOS All Rights Reserved. Powered by ArtOS.