基于Dify认识工作流引擎:DAG调度

一、从痛点到原理

1.1 开发工作中的痛点

在日常业务开发和系统运维中,我们经常面临以下严峻挑战:

  • 多任务依赖混乱:
    • 场景:一个数据处理任务需要等待 A、B、C 三个上游任务完成后才能启动,而 D 任务只需要 A 完成即可。
    • 痛点:手动维护这种复杂的依赖关系(如通过 Cron 脚本轮询状态)极易出错,一旦某个环节延迟,整个链路阻塞且难以感知。
  • 调度低效:
    • 场景:传统线性执行模式(串行)导致资源闲置。例如,任务 B 不依赖任务 A,但必须等 A 跑完才启动。
    • 痛点:无法充分利用集群算力,整体耗时 = 所有任务耗时之和,效率低下。
  • 流程不可控:
    • 场景:任务执行中途失败,是重试?跳过?还是回滚?缺乏统一的策略。
    • 痛点:错误处理逻辑硬编码在业务代码中,耦合严重,难以动态调整。
  • 可追溯性差:
    • 场景:最终结果异常,需要排查是哪个环节、哪次输入导致了问题。
    • 痛点:缺乏完整的执行链路记录(Input/Output/Status),问题排查如“大海捞针”。

结论:我们需要一种能够自动管理依赖、最大化并发、统一容错且全程可观测的调度机制。这就是 DAG(有向无环图)调度 诞生的背景。

1.2 什么是 DAG?

在深入代码之前,我们先回顾一下DAG基础理论。

1.2.1 定义

DAG (Directed Acyclic Graph),即有向无环图。

  • 有向 (Directed):边有方向,代表任务执行的先后顺序(A → B 表示 A 必须在 B 之前)。
  • 无环 (Acyclic):图中不存在回路,即不可能从一个节点出发沿着箭头方向又能回到该节点(避免死循环)。
  • 图 (Graph):由节点 (Node) 和 边 (Edge) 组成的数据结构。

DAG

加权 DAG: 在实际工程和算法问题中,有时边带有权重,用来表示某种“代价”或“量化关系”。

1.2.2 核心三要素

要素 含义 业务映射
节点 (Node) 图中的基本单元 一个具体的任务(如:调用 LLM、执行 Python 代码、发送 HTTP 请求)
边 (Edge) 连接节点的有向线段 任务间的依赖关系(前置任务完成 -> 触发后置任务)
无环性 图中不存在闭环 保证流程必然能结束,不会出现“A 等 B,B 等 A”的死锁

1.2.3 常见的应用场景

场景 是否有权重 权重的含义 典型应用
任务调度/依赖关系 通常没有 仅表示 A 必须在 B 之前完成 工作流引擎、CI/CD 流水线、构建系统
最短/最长路径算法 表示距离、耗时、成本或利润 导航系统、网络路由、关键路径分析
动态规划(DP) 状态转移的代价或收益 最优子结构问题、背包问题、编辑距离
神经网络(计算图) 在深度学习中,边权重通常代表神经元之间的连接权重 深度学习框架(TensorFlow、PyTorch)、模型训练
分布式事务 通常没有 表示事务操作的先后顺序和依赖 两阶段提交(2PC)、Saga 模式

1.2.4 场景示例:视频转码流水线

假设我们要对一个视频进行处理:

DAG

  • 并行性:音频提取视频压缩 互不依赖,可以同时执行。
  • 依赖性:合并输出 必须等待 字幕生成视频压缩 都完成后才能开始。
  • 无环性:流程单向流动,绝不会从 合并封装 跳回 下载原始视频

1.3 DAG 是如何调度的?(核心原理)

理解了结构,DAG 引擎是如何驱动这些任务运行的?核心流程分为四步:

1.3.1 调度流程图解

1.3.2 详细步骤说明

  1. 解析与构建 (Parse & Build)

    • 读取配置文件(JSON/YAML),提取所有的节点定义和边关系

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      # 视频处理 DAG 工作流配置
      workflow:
      name: "视频转码流水线"
      version: "1.0"

      # 节点定义
      nodes:
      - id: download_video
      name: "下载原始视频"
      type: "http_request"
      config:
      url: "{{ input.video_url }}"
      method: "GET"
      output: "raw_video_path"

      - id: extract_audio
      name: "音频提取"
      type: "shell_command"
      config:
      command: "ffmpeg -i {{ download_video.raw_video_path }} -vn audio.mp3"
      depends_on: ["download_video"]
      output: "audio_path"

      - id: compress_video
      name: "视频压缩"
      type: "shell_command"
      config:
      command: "ffmpeg -i {{ download_video.raw_video_path }} -vcodec libx264 compressed.mp4"
      depends_on: ["download_video"]
      output: "compressed_video_path"

      - id: generate_subtitles
      name: "字幕生成"
      type: "ai_service"
      config:
      api: "whisper"
      input: "{{ extract_audio.audio_path }}"
      depends_on: ["extract_audio"]
      output: "subtitle_path"

      - id: merge_output
      name: "合并封装"
      type: "shell_command"
      config:
      command: "ffmpeg -i {{ compress_video.compressed_video_path }} -i {{ generate_subtitles.subtitle_path }} -c copy final_output.mp4"
      depends_on: ["compress_video", "generate_subtitles"]
      output: "final_video_path"
    • 在内存中构建图数据结构(邻接表或邻接矩阵)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      # 节点索引映射(用于矩阵定位)
      node_index = {
      "download_video": 0,
      "extract_audio": 1,
      "compress_video": 2,
      "generate_subtitles": 3,
      "merge_output": 4
      }

      # 邻接矩阵表示法 (Adjacency Matrix)
      # matrix[i][j] = 1 表示节点 i 指向节点 j,否则为 0
      adjacency_matrix = [
      # dv ea cv gs mo
      [0, 1, 1, 0, 0], # download_video (0)
      [0, 0, 0, 1, 0], # extract_audio (1)
      [0, 0, 0, 0, 1], # compress_video (2)
      [0, 0, 0, 0, 1], # generate_subtitles (3)
      [0, 0, 0, 0, 0] # merge_output (4)
      ]

      # 邻接表表示法 (Adjacency List) - 更节省空间
      # Key: 节点ID, Value: 该节点指向的所有后继节点列表
      adjacency_list = {
      "download_video": ["extract_audio", "compress_video"],
      "extract_audio": ["generate_subtitles"],
      "compress_video": ["merge_output"],
      "generate_subtitles": ["merge_output"],
      "merge_output": [] # 汇点,没有后继节点
      }

      # 反向邻接表(用于快速查找前置依赖)
      reverse_adjacency_list = {
      "download_video": [], # 源点,没有前置节点
      "extract_audio": ["download_video"],
      "compress_video": ["download_video"],
      "generate_subtitles": ["extract_audio"],
      "merge_output": ["compress_video", "generate_subtitles"]
      }

      选择建议:

      • DAG 工作流通常是稀疏图(每个节点只有少量依赖),推荐使用邻接表
      • 邻接矩阵适合需要频繁判断”任意两节点是否有边”的场景
      • 实际工程中(如 Dify),多采用邻接表 + 哈希表的组合方案
    • 关键点:此时只建立结构,不执行任何业务逻辑

  2. 拓扑排序 (Topological Sort)

    • 目的:确定一个合法的线性执行序列,或者验证图的合法性

    • 算法:通常使用 Kahn 算法DFS

    • 核心逻辑

      1
      2
      3
      4
      5
      6
      7
      8
      # 入度统计(用于拓扑排序)
      in_degree = {
      "download_video": 0,
      "extract_audio": 1,
      "compress_video": 1,
      "generate_subtitles": 1,
      "merge_output": 2
      }
      1. 统计每个节点的入度(有多少个前置任务)
      2. 将入度为 0 的节点加入队列
      3. 依次处理队列中的节点,将其后继节点的入度减 1,若减后入度为 0 则加入队列
      4. 如果最终处理的节点数等于总节点数,则无环;否则存在循环依赖,报错终止
      • 作用:提前发现配置错误,避免运行时死锁
  3. 就绪检测 (Ready Check)

    • 在运行时,引擎维护一个运行时状态表
    • 当一个节点执行成功后,将其所有下游节点的”已完成前置计数”减 1
    • 当某节点的”已完成前置计数”归零时,标记为 Ready(就绪)
  4. 并发执行 (Execution)

    • 调度器不断扫描 Ready 队列
    • 将 Ready 节点提交给线程池或协程池异步执行
    • 优势:互不依赖的分支天然并行,最大化利用资源
    • 状态流转RunningSuccess/Fail → 触发下游/报错

1.4 从理论到落地:以 Dify 为例解析 DAG 实践

理解了 DAG 的理论价值后,我们需要一个成熟的工程化案例来观察它是如何落地的。Dify 工作流引擎是业界基于 DAG 理论构建的典型代表,其内部完整实现了依赖管理、拓扑排序和并发调度机制。

本次分享将以 Dify 的源码实现为蓝本,深入剖析:

  • 理论映射:DAG 的抽象概念(节点、边、状态)在代码中如何具体定义。
  • 调度实战:拓扑排序算法如何在真实的高并发场景中被调用和执行。
  • 工程挑战:面对失败重试、断点续跑等复杂需求,DAG 引擎是如何扩展和解决的。

通过拆解 Dify 这个”标本”,我们不仅能掌握 DAG 调度的通用原理,更能获得一套可复用的架构设计思路,应用于公司未来的各类任务编排场景中。

二、DAG 调度的核心作用

承接第一章的原理,DAG 调度在实际业务中具体解决了哪些核心问题?

2.1 支撑复杂自动化任务流转

在 Dify 中,工作流不仅仅是简单的线性脚本,而是支持复杂逻辑的编排系统。它定义了多种节点类型来覆盖不同场景:

节点类型 英文标识 作用 典型场景
开始节点 START 流程入口,接收初始参数 API 触发、定时任务触发
大模型节点 LLM 调用 LLM 进行推理 文本生成、意图识别、润色
HTTP 请求 HTTP_REQUEST 调用外部 API 查询天气、调用内部微服务
代码执行 CODE 运行 Python/JS 片段 数据清洗、格式转换、复杂计算
模板转换 TEMPLATE_TRANSFORM 字符串模板渲染 构造 Prompt、格式化输出
知识检索 KNOWLEDGE_RETRIEVAL 向量数据库查询 RAG 场景下的上下文获取
工具调用 TOOL 调用预定义插件 搜索谷歌、执行 SQL、画图
结束节点 END 流程终点,输出最终结果 返回给用户响应

实际场景示例:智能客服工单处理

DAG 调度确保了:

  1. 检索和日志记录可以并行。
  2. 意图判断完成后,根据结果动态选择分支。
  3. 所有分支最终汇聚到 END 节点,保证流程完整性。

2.2 解决多模块协同依赖

在传统脚本中,模块间调用往往是硬编码的(func_a(); func_b())。而在 DAG 中,依赖是通过元数据描述的:

  • 解耦:节点 A 不需要知道节点 B 的存在,它只负责执行自己的逻辑并通知引擎“我完成了”。
  • 自动触发:引擎通过维护 GraphRuntimeState(运行时状态),自动计算哪些节点的所有上游已完成。
  • 数据传递:通过 VariablePool(变量池),上游节点的输出自动映射为下游节点的输入,无需手动传参。

核心价值:新增一个中间处理环节,只需在配置图中插入一个节点并连线,无需修改任何现有业务代码。

2.3 提升稳定性和可追溯性

Dify 基于 DAG 架构,内置了企业级的稳定性保障:

  1. 状态持久化:
    每一步执行状态(Running/Success/Failed)都实时写入数据库 (WorkflowRunRepository)。
    即使服务器重启,也能从断点恢复(取决于具体实现策略)。
  2. 精细化失败重试:
    支持针对特定节点配置重试策略(如:网络波动导致的 HTTP 失败重试 3 次)。
    局部失败不影响独立分支的执行。
  3. 执行限制保护:
    通过 ExecutionLimitsLayer 防止死循环或资源耗尽(如:最大执行步数限制、总超时时间限制)。
  4. 全链路可观测:
    每个节点的 Input、Output、Elapsed Time、Error Message 均有详细记录。
    提供可视化的执行轨迹,一键定位瓶颈或错误点。

三、Dify 的代码实现

本章我们将深入 Dify 源码(基于 graphon 引擎),看上述理论是如何转化为代码的。

3.1 核心目标

在代码层面,我们需要实现四个核心目标:

  1. 正确解析 DAG 结构:从 JSON 配置构建内存图。
  2. 保证执行顺序:严格遵守依赖约束,绝不违规执行。
  3. 高效并发:识别独立分支,利用多线程/协程并行处理。
  4. 状态一致:在并发环境下,确保变量池和运行状态的线程安全。

3.2 关键源码详解

3.2.1 图引擎:GraphEngine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
class GraphEngine:
"""
Graph Engine - DAG 工作流调度引擎
"""

def __init__(
self,
tenant_id: str,
app_id: str,
workflow_type: WorkflowType,
workflow_id: str,
user_id: str,
invoke_from: InvokeFrom,
) -> None:
"""
Initialize graph engine

初始化引擎上下文信息,为后续图构建和执行提供元数据
"""
self.tenant_id = tenant_id
self.app_id = app_id
self.workflow_type = workflow_type
self.workflow_id = workflow_id
self.user_id = user_id
self.invoke_from = invoke_from

def execute(
self,
workflow_run_id: str,
workflow_config: dict[str, Any],
user_inputs: dict[str, Any],
system_variables: dict[str, Any],
environment_variables: dict[str, Any],
session_variables: dict[str, Any],
*,
event_handler: Optional[Callable[[WorkflowEvent], None]] = None,
) -> Generator[WorkflowEvent, None, None]:
"""
Execute workflow - 执行 DAG 工作流
"""

# 将租户、应用等元数据封装,用于后续节点执行时的上下文传递
graph_init_params = GraphInitParams(
tenant_id=self.tenant_id,
app_id=self.app_id,
workflow_id=self.workflow_id,
workflow_run_id=workflow_run_id,
user_id=self.user_id,
invoke_from=self.invoke_from,
)

# 创建变量池 - 线程安全的数据存储,支持用户输入、系统变量、环境变量、会话变量的隔离存储
variable_pool = VariablePool(
user_inputs=user_inputs,
system_variables=system_variables,
environment_variables=environment_variables,
session_variables=session_variables,
)

# 创建图运行时状态 - 跟踪所有节点的执行状态
graph_runtime_state = GraphRuntimeState(
variable_pool=variable_pool,
)

# 创建图对象,解析 DAG 配置并构建内存图结构
graph = Graph(
workflow_config=workflow_config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)

# 创建线程池,支持并发执行独立分支,max_workers=10 限制最大并发数,避免资源耗尽
thread_pool = GraphEngineThreadPool(max_workers=10)
try:
yield WorkflowStartedEvent(
workflow_run_id=workflow_run_id,
)

# 执行图 - 内部实现拓扑排序和就绪检测
# 1. 找到所有入度为0的节点(源点),标记为 Ready
# 2. 将 Ready 节点提交到线程池异步执行
# 3. 节点完成后,减少下游节点的 remaining_deps
# 4. 当某节点 remaining_deps 归零时,标记为 Ready 并提交执行
# 5. 重复步骤2-4,直到所有节点执行完成
# 这个过程严格保证了依赖顺序:只有当前置任务全部完成后,后置任务才会执行
for event in graph.execute(thread_pool=thread_pool):

if event_handler:
event_handler(event)

yield event

if isinstance(event, WorkflowCompletedEvent):
break

except Exception as e:
# 即使发生异常,也会发送失败事件,确保调用方能感知错误
yield WorkflowFailedEvent(
workflow_run_id=workflow_run_id,
error=str(e),
)

finally:
# 关闭线程池 - 资源清理,wait=True 确保所有正在执行的任务完成后才关闭,避免资源泄漏
thread_pool.shutdown(wait=True)

3.2.2 图引擎线程池:GraphEngineThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class GraphEngineThreadPool(ThreadPoolExecutor):
"""
Graph Engine Thread Pool - DAG 专用线程池
"""

def __init__(self, max_workers: Optional[int] = None) -> None:
"""
Initialize graph engine thread pool

继承自 ThreadPoolExecutor,增加任务追踪能力
"""
super().__init__(max_workers=max_workers)
# 【目标4】任务追踪字典 - 记录所有正在执行的任务
# Key: thread_pool_id (任务唯一标识)
# Value: Future 对象(代表异步任务的执行结果)
# 用于检查线程池是否已满、查询任务状态等
self._futures: dict[str, Future] = {}

def submit_task(
self, thread_pool_id: str, fn: Callable, *args: Any, **kwargs: Any
) -> Future:
"""
提交任务到线程池

- 将 DAG 中处于 Ready 状态的节点提交到线程池异步执行
- 互不依赖的节点可以并行执行,最大化利用 CPU 资源
"""
# 提交任务到父类 ThreadPoolExecutor,获取 Future 对象
future = self.submit(fn, *args, **kwargs)

# 这样可以通过 get_future() 查询任务状态,通过 is_full() 判断是否能提交新任务
self._futures[thread_pool_id] = future

# 添加完成回调 - 自动清理已完成的任务
future.add_done_callback(lambda _: self._futures.pop(thread_pool_id, None))

return future

def is_full(self) -> bool:
"""
检查线程池是否已满
"""
return len(self._futures) >= self._max_workers

def get_future(self, thread_pool_id: str) -> Optional[Future]:
"""
根据任务ID获取 Future 对象
"""
return self._futures.get(thread_pool_id)