ArtOS 快速开发框架

ArtOS 快速开发框架

首页
ArtOS框架
使用文档视频教程
Micropython
官方文档学习教程固件开发
发现
树莓派ESP32
生态圈
智能生活直播物联工业物联Node-RED
开发资源
开发板软件资源器件资源
关于ArtOS
登录 →
ArtOS 快速开发框架

ArtOS 快速开发框架

首页 ArtOS框架
使用文档视频教程
Micropython
官方文档学习教程固件开发
发现
树莓派ESP32
生态圈
智能生活直播物联工业物联Node-RED
开发资源
开发板软件资源器件资源
关于ArtOS
登录
  1. 首页
  2. ArtOS框架
  3. 开发文档
  4. Micropython 指数退避执行器 BackoffExecutor

Micropython 指数退避执行器 BackoffExecutor

0
  • 开发文档
  • 发布于 2026-01-29
  • 8 次阅读
极客熊
极客熊
import uasyncio as asyncio
import urandom

class BackoffExecutor:
    """
    BackoffExecutor —— 指数退避执行器(支持 Full Jitter)
    =========================================================
    用于在“不可靠执行环境”下,对某个函数进行安全重试的通用执行器。

    核心能力:
        - Exponential Backoff(指数退避)
        - Full Jitter(随机抖动,避免惊群效应)
        - 防重入(同一时间只允许一次 execute)
        - async / sync 函数透明支持
        - 完整回调生命周期
        - 支持外部软取消
        - 兼容 MicroPython / uasyncio

    典型使用场景:
        - WiFi / MQTT 连接
        - 网络请求
        - 外设初始化
        - 不稳定 IO 操作
    """

    # ==========================================================
    # 构造与状态
    # ==========================================================
    def __init__(
        self,
        base_delay_ms=500,
        max_backoff_ms=10000,
        max_retry=5,
        jitter=False
    ):
        """
        构造函数

        :param base_delay_ms: int
            初始退避时间(毫秒)
            第 1 次失败后的等待时间基数

        :param max_backoff_ms: int
            最大退避时间(毫秒)
            防止指数增长导致等待时间过长

        :param max_retry: int
            最大重试次数
                -1  : 无限重试
                >=0 : 超过该次数后终止

        :param jitter: bool
            是否启用 Full Jitter
                True  : delay ∈ [0, backoff]
                False : delay = backoff
        """
        self.base_delay_ms = base_delay_ms
        self.max_backoff_ms = max_backoff_ms
        self.max_retry = max_retry
        self.jitter = jitter

        # asyncio 锁:用于防止 execute 并发重入
        self._lock = asyncio.Lock()

        # 执行器当前是否处于运行状态(业务语义)
        self._running = False

        # 外部取消标志(软中断,不会强杀当前 fn)
        self._cancel_requested = False

    @property
    def running(self):
        """
        当前执行器是否正在运行

        说明:
            - running 是“语义状态”
            - 不等同于 lock.locked()
        """
        return self._running

    def cancel(self):
        """
        请求取消当前执行流程

        行为说明:
            - 不会中断正在执行的 fn
            - 在下一次循环检查点安全退出
            - 适合用于任务生命周期管理
        """
        self._cancel_requested = True

    # ==========================================================
    # 退避策略(Backoff Strategy)
    # ==========================================================
    def _calc_delay(self, retry_count):
        """
        计算本次重试的退避时间(毫秒)

        指数退避公式:
            backoff = min(base * 2^(retry_count - 1), max)

        Full Jitter(推荐):
            delay = random(0, backoff)

        设计说明:
            - Full Jitter 可有效避免多实例同步重试
            - 特别适合网络 / MQTT / 云服务场景

        :param retry_count: int
            当前失败次数(从 1 开始)
        :return: int
            本次 sleep 的毫秒数
        """
        backoff = min(
            self.base_delay_ms * (2 ** (retry_count - 1)),
            self.max_backoff_ms
        )

        if not self.jitter:
            # 严格指数退避
            return backoff

        # Full Jitter:0 ~ backoff 的随机值
        # 使用 urandom,兼容 MicroPython
        return urandom.getrandbits(16) % (backoff + 1)

    # ==========================================================
    # 执行入口
    # ==========================================================
    async def execute(self, fn, on_success=None, on_fail=None, on_retry=None):
        """
        执行函数 fn,失败时自动进行指数退避重试

        执行流程:
            execute()
                └─ 调用 fn
                    ├─ 成功 → on_success → return True
                    ├─ 失败 → on_fail → backoff → retry
                    └─ 超过最大重试 → on_retry → return False

        :param fn: callable
            async 或 sync 函数
            把 fn 封装成一个零参函数再交给 execute() 或者传入 lambda: 函数名(参数)
            返回语义:
                True        : 成功,终止重试
                False       : 失败,进入重试
                Exception   : 失败,进入重试

        :param on_success: callable | None
            成功回调:
                on_success(result)

        :param on_fail: callable | None
            每次失败回调:
                on_fail(exception, retry_count, next_delay_ms)

        :param on_retry: callable | None
            重试耗尽回调:
                on_retry(last_exception)

        :return: bool
            True  : 最终成功
            False : 被取消 / 超过最大重试 / 重入被拒绝
        """

        # ======================================================
        # 防止并发重入
        # ======================================================
        if self._lock.locked():
            return False

        async with self._lock:
            self._running = True
            self._cancel_requested = False
            retry = 0

            try:
                while True:
                    # ==================================================
                    # 外部取消检测
                    # ==================================================
                    if self._cancel_requested:
                        return False

                    try:
                        # ==================================================
                        # 执行目标函数
                        # ==================================================
                        result = fn()
                        if hasattr(result, "send") and hasattr(result, "__next__"):
                            result = await result

                        # ==================================================
                        # 成功路径
                        # ==================================================
                        if result:
                            if on_success:
                                try:
                                    on_success_rec = on_success(result)
                                    if hasattr(on_success_rec, "send"):
                                        await on_success_rec
                                except Exception:
                                    # 回调异常不影响主流程
                                    pass
                            return True

                        # 返回 False 统一视为失败
                        raise OSError("Execution returned False")

                    except Exception as e:
                        # ==================================================
                        # 失败路径
                        # ==================================================
                        retry += 1
                        delay = self._calc_delay(retry)

                        if on_fail:
                            try:
                                on_fail_rec = on_fail(e, retry, delay)
                                if hasattr(on_fail_rec, "send"):
                                    await on_fail_rec
                            except Exception:
                                pass

                        # ==================================================
                        # 是否超过最大重试次数
                        # ==================================================
                        if self.max_retry != -1 and retry > self.max_retry:
                            if on_retry:
                                try:
                                    on_retry_rec = on_retry(e)
                                    if hasattr(on_retry_rec, "send"):
                                        await on_retry_rec
                                except Exception:
                                    pass
                            return False

                        # ==================================================
                        # 退避等待
                        # ==================================================
                        await asyncio.sleep_ms(delay)

            finally:
                # 确保任何退出路径都清理状态
                self._running = False

相关文章
Micropython 指数退避执行器 BackoffExecutor

Micropython 指数退避执行器 BackoffExecutor

import uasyncio as asyncio import urandom class BackoffExecutor: """ BackoffExecutor —— 指数退避执行器(支持 Full Jitter) =========================

Micropython 时序执行器 SequencerExecutor

Micropython 时序执行器 SequencerExecutor

import uasyncio as asyncio class SeqAction: """ SeqAction ================================================== 时序动作单元(Sequencer Action

目录
  • 极客熊
  • 极客熊
  • 极客熊
Copyright © 2026 ArtOS All Rights Reserved. Powered by ArtOS.