import uasyncio as asyncio
class SeqAction:
"""
SeqAction
==================================================
时序动作单元(Sequencer Action)
SequencerExecutor 的最小执行单位,用于描述:
在 delay(ms) 延迟后,
输出一个 value(电平/占空比/指令等)。
典型用途:
- LED 闪烁节奏(0/1)
- 蜂鸣器鸣叫模式(on/off)
- 电机脉冲序列(step pulse)
- UI 动画状态序列
属性:
delay : int
延迟时间(毫秒)
- delay=0 表示立即执行
value : any
输出值,完全由 cb(value) 的实现决定
MicroPython 优化:
使用 __slots__ 避免动态属性字典,
降低内存占用,适合 ESP32 等嵌入式环境。
"""
__slots__ = ("delay", "value")
def __init__(self, delay_ms: int, value):
"""
:param delay_ms: 动作执行前延迟时间(ms)
:param value: 动作输出值(由回调 cb(value) 消费)
"""
self.delay = delay_ms
self.value = value
class SequencerExecutor:
"""
SequencerExecutor (ArtOS Enhanced)
==================================================
ArtOS 时序序列执行器(输出节奏控制核心组件)
用于以严格顺序执行一组 SeqAction:
action1 -> action2 -> action3 -> ...
每个动作包含:
- 延迟 delay
- 输出 value
--------------------------------------------------
设计目标(嵌入式工程约束)
--------------------------------------------------
1. 非阻塞执行:
基于 uasyncio.create_task 调度,不阻塞主循环
2. 安全停止:
stop() cancel 任务后必进入 finally 恢复安全态
3. 防止重入:
busy-lock 保证同一时刻只有一个序列任务存活
4. 生命周期可控:
wait_stopped() 支持框架级 await 停止完成
5. 安全态恢复:
safe_value 用于退出后恢复 GPIO 默认电平
--------------------------------------------------
核心特性总结
--------------------------------------------------
- 顺序动作执行(delay + value)
- repeat 次数控制(N 次 / 无限循环)
- cancel 停止(正常退出路径)
- safe_value 自动恢复
- busy 防重入
- await wait_stopped 生命周期管理
"""
def __init__(self, cb, safe_value=None):
"""
构造 SequencerExecutor
:param cb: callable(value)
输出回调函数(真正执行动作的接口)
例如:
cb = lambda v: pin.value(v)
:param safe_value:
安全恢复值(停止后自动输出)
常用于:
- LED 默认熄灭(0)
- 蜂鸣器默认关闭(0)
- 电机默认停止指令
"""
self.cb = cb
self.safe_value = safe_value
# 动作序列缓存(SeqAction 列表)
self.actions = []
# repeat 执行次数:
# >0 表示执行 N 次
# -1 表示无限循环
self.repeat = 1
# 已完成的序列次数计数
self.exec_count = 0
# 当前运行标志(主循环条件)
self.running = False
# uasyncio Task 引用(用于 cancel)
self._task = None
# ==================================================
# ArtOS Enhancement:防重入与生命周期控制
# ==================================================
# busy=True 表示仍有 Task 存活
# start() 会拒绝在 busy 状态下再次启动
self._busy = False
# stopping=True 表示 stop() 已触发
# 可用于未来扩展(区分自然结束 vs stop cancel)
self._stopping = False
# ==================================================
# start:启动序列执行
# ==================================================
def start(self, actions, repeat=1):
"""
启动序列执行(非阻塞)
:param actions: list[SeqAction]
动作序列(会浅拷贝,避免外部修改影响执行)
:param repeat:
>0 执行 N 次
-1 无限循环
0 不执行
:return bool:
True 启动成功
False 启动失败(busy 或参数无效)
"""
if not actions or repeat == 0:
return False
# 防止重入:
# cancel 后旧任务未退出前,busy 仍为 True
if self._busy:
return False
# 缓存动作序列(浅拷贝)
self.actions = list(actions)
# 初始化重复次数与计数器
self.repeat = repeat
self.exec_count = 0
# 设置运行状态
self.running = True
self._busy = True
self._stopping = False
# 创建异步执行任务(立即返回)
self._task = asyncio.create_task(self._run())
return True
# ==================================================
# stop:请求停止(非阻塞)
# ==================================================
def stop(self):
"""
请求停止序列执行(非阻塞)
stop() 的职责:
1. 设置 running=False
2. cancel Task
3. safe_value 恢复交给 finally
注意:
cancel() 并不会立即终止任务,
Task 会在 finally 中完成清理。
"""
if not self._busy:
return
# 标记停止状态
self.running = False
self._stopping = True
# cancel 任务(触发 CancelledError)
if self._task:
self._task.cancel()
# ==================================================
# wait_stopped:等待完全退出
# ==================================================
async def wait_stopped(self):
"""
等待 Sequencer 完全退出(框架生命周期管理)
适用场景:
stop() 后需要确保硬件已恢复 safe_value
并且 Task 已彻底释放资源。
实现方式:
busy=False 由 finally 统一释放
"""
while self._busy:
await asyncio.sleep_ms(1)
# ==================================================
# _run:内部异步执行循环(核心)
# ==================================================
async def _run(self):
"""
Sequencer 主执行协程
执行逻辑:
while running:
执行完整 actions 序列
exec_count += 1
repeat 控制退出
Cancel 路径:
stop() -> task.cancel()
-> raise CancelledError
-> finally 执行安全恢复与状态释放
"""
try:
while self.running:
# repeat 次数控制(无限循环 repeat=-1)
if self.repeat != -1 and self.exec_count >= self.repeat:
break
# 顺序执行动作列表
for action in self.actions:
# stop() 后立即中断动作序列
if not self.running:
break
# 延迟执行(毫秒)
if action.delay > 0:
await asyncio.sleep_ms(action.delay)
# 输出动作值(由 cb 实现硬件控制)
self.cb(action.value)
# 完成一次完整序列
self.exec_count += 1
# 防止 delay=0 无限循环空转占满 CPU
await asyncio.sleep_ms(1)
except asyncio.CancelledError:
# cancel 属于正常退出路径
pass
finally:
# ==================================================
# 安全恢复(safe_value)
# ==================================================
if self.safe_value is not None:
try:
self.cb(self.safe_value)
except Exception:
# safe_value 恢复失败不应影响状态释放
pass
# ==================================================
# 状态清理(关键:释放 busy lock)
# ==================================================
self.running = False
self._task = None
self._busy = False
self._stopping = False