import uasyncio as asyncio
import urandom
class BackoffExecutor:
"""
BackoffExecutor —— 指数退避执行器(支持 Full Jitter)
=========================================================
用于在“不可靠执行环境”下,对某个函数进行安全重试的通用执行器。
核心能力:
- Exponential Backoff(指数退避)
- Full Jitter(随机抖动,避免惊群效应)
- 防重入(同一时间只允许一次 execute)
- async / sync 函数透明支持
- 完整回调生命周期
- 支持外部软取消
- 兼容 MicroPython / uasyncio
典型使用场景:
- WiFi / MQTT 连接
- 网络请求
- 外设初始化
- 不稳定 IO 操作
"""
# ==========================================================
# 构造与状态
# ==========================================================
def __init__(
self,
base_delay_ms=500,
max_backoff_ms=10000,
max_retry=5,
jitter=False
):
"""
构造函数
:param base_delay_ms: int
初始退避时间(毫秒)
第 1 次失败后的等待时间基数
:param max_backoff_ms: int
最大退避时间(毫秒)
防止指数增长导致等待时间过长
:param max_retry: int
最大重试次数
-1 : 无限重试
>=0 : 超过该次数后终止
:param jitter: bool
是否启用 Full Jitter
True : delay ∈ [0, backoff]
False : delay = backoff
"""
self.base_delay_ms = base_delay_ms
self.max_backoff_ms = max_backoff_ms
self.max_retry = max_retry
self.jitter = jitter
# asyncio 锁:用于防止 execute 并发重入
self._lock = asyncio.Lock()
# 执行器当前是否处于运行状态(业务语义)
self._running = False
# 外部取消标志(软中断,不会强杀当前 fn)
self._cancel_requested = False
@property
def running(self):
"""
当前执行器是否正在运行
说明:
- running 是“语义状态”
- 不等同于 lock.locked()
"""
return self._running
def cancel(self):
"""
请求取消当前执行流程
行为说明:
- 不会中断正在执行的 fn
- 在下一次循环检查点安全退出
- 适合用于任务生命周期管理
"""
self._cancel_requested = True
# ==========================================================
# 退避策略(Backoff Strategy)
# ==========================================================
def _calc_delay(self, retry_count):
"""
计算本次重试的退避时间(毫秒)
指数退避公式:
backoff = min(base * 2^(retry_count - 1), max)
Full Jitter(推荐):
delay = random(0, backoff)
设计说明:
- Full Jitter 可有效避免多实例同步重试
- 特别适合网络 / MQTT / 云服务场景
:param retry_count: int
当前失败次数(从 1 开始)
:return: int
本次 sleep 的毫秒数
"""
backoff = min(
self.base_delay_ms * (2 ** (retry_count - 1)),
self.max_backoff_ms
)
if not self.jitter:
# 严格指数退避
return backoff
# Full Jitter:0 ~ backoff 的随机值
# 使用 urandom,兼容 MicroPython
return urandom.getrandbits(16) % (backoff + 1)
# ==========================================================
# 执行入口
# ==========================================================
async def execute(self, fn, on_success=None, on_fail=None, on_retry=None):
"""
执行函数 fn,失败时自动进行指数退避重试
执行流程:
execute()
└─ 调用 fn
├─ 成功 → on_success → return True
├─ 失败 → on_fail → backoff → retry
└─ 超过最大重试 → on_retry → return False
:param fn: callable
async 或 sync 函数
把 fn 封装成一个零参函数再交给 execute() 或者传入 lambda: 函数名(参数)
返回语义:
True : 成功,终止重试
False : 失败,进入重试
Exception : 失败,进入重试
:param on_success: callable | None
成功回调:
on_success(result)
:param on_fail: callable | None
每次失败回调:
on_fail(exception, retry_count, next_delay_ms)
:param on_retry: callable | None
重试耗尽回调:
on_retry(last_exception)
:return: bool
True : 最终成功
False : 被取消 / 超过最大重试 / 重入被拒绝
"""
# ======================================================
# 防止并发重入
# ======================================================
if self._lock.locked():
return False
async with self._lock:
self._running = True
self._cancel_requested = False
retry = 0
try:
while True:
# ==================================================
# 外部取消检测
# ==================================================
if self._cancel_requested:
return False
try:
# ==================================================
# 执行目标函数
# ==================================================
result = fn()
if hasattr(result, "send") and hasattr(result, "__next__"):
result = await result
# ==================================================
# 成功路径
# ==================================================
if result:
if on_success:
try:
on_success_rec = on_success(result)
if hasattr(on_success_rec, "send"):
await on_success_rec
except Exception:
# 回调异常不影响主流程
pass
return True
# 返回 False 统一视为失败
raise OSError("Execution returned False")
except Exception as e:
# ==================================================
# 失败路径
# ==================================================
retry += 1
delay = self._calc_delay(retry)
if on_fail:
try:
on_fail_rec = on_fail(e, retry, delay)
if hasattr(on_fail_rec, "send"):
await on_fail_rec
except Exception:
pass
# ==================================================
# 是否超过最大重试次数
# ==================================================
if self.max_retry != -1 and retry > self.max_retry:
if on_retry:
try:
on_retry_rec = on_retry(e)
if hasattr(on_retry_rec, "send"):
await on_retry_rec
except Exception:
pass
return False
# ==================================================
# 退避等待
# ==================================================
await asyncio.sleep_ms(delay)
finally:
# 确保任何退出路径都清理状态
self._running = False