坑边闲话 :许多 Python 程序员对异步编程、并发编程可能有所耳闻,但是并没有做过真实的项目。本文尝试以批量处理 ffmpeg 任务为例,讲解如何使用 Python 的并发编程组件。Python 并不擅长计算,但擅长调度,因此本文的技术范式是把计算交给 ffmpeg,把控制留给 Python。后续如果有类似的计算需求,也可以直接迁移使用。
假设你有一个任务:把硬盘里几百个视频文件批量转码成 H.265 格式。
最简单的办法是写一个循环,用 subprocess 一个一个地调用 ffmpeg。跑起来没问题,但如果你的机器有 16 个核,转码时你会发现 CPU 占用率只有 6% 左右——因为你一次只开了一个 ffmpeg 进程,剩下 15 个核全在闲着。这篇文章会带你用 Python 的 concurrent.futures.ProcessPoolExecutor 构建一个并行转码调度器,充分利用多核 CPU,同时支持 Ctrl C 中断,并能正确终止所有 ffmpeg 子进程。
1. 问题背景·
ffmpeg 转码是典型的 CPU 密集型任务。一个 4K 视频转码可能要跑几分钟,期间 CPU 满载。顺序执行时,每次只有一个核心在工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 import subprocessvideo_files = ["a.mp4" , "b.mp4" , "c.mp4" ] for input_file in video_files: output_file = input_file.replace(".mp4" , ".hevc.mp4" ) subprocess.run([ "ffmpeg" , "-i" , input_file, "-c:v" , "libx265" , "-crf" , "28" , output_file ]) print (f"完成:{input_file} " )
这段代码的问题很直接:
效率低 :16 核机器只用了 1 个核
无并发 :任务串行,总时间 = 所有任务时间之和
Ctrl C 行为不可控 :中断后 ffmpeg 进程可能残留在后台
我们想要的是:同时跑 N 个 ffmpeg 进程(N = CPU 核心数),任意一个完成就打印进度,按 Ctrl C 时能干净地退出。
2. ProcessPoolExecutor 并行调度·
2.1 两种执行器·
Python 的 concurrent.futures 模块提供了两种执行器(Executor):
ThreadPoolExecutor:线程池,适合 I/O 密集型任务
ProcessPoolExecutor:进程池,适合 CPU 密集型任务
这里我们用 ProcessPoolExecutor。原因是 Python 有 GIL(全局解释器锁),多线程无法真正并行执行 CPU 密集型代码;而多进程没有这个限制。
不过需要注意:ffmpeg 本身是外部进程,Python 代码只是负责启动它,所以 GIL 并不是核心瓶颈。使用 ProcessPoolExecutor 的主要好处是:每个 worker 进程独立,崩溃不会影响主进程,信号处理也更清晰(后面会细说)。
2.1 submit() 和 Future 对象·
ProcessPoolExecutor 的核心用法是 submit():
1 2 3 4 from concurrent.futures import ProcessPoolExecutorwith ProcessPoolExecutor(max_workers=4 ) as executor: future = executor.submit(some_function, arg1, arg2)
submit() 会把任务提交给进程池,立即返回 一个 Future 对象,而不会等任务完成。
Future 是这个模型的核心概念。中文直译是"未来",含义也正是如此:它代表一个"将来某时会得到结果的任务"。你可以把它想象成一张取票号——你提交任务后拿到票,任务还在跑,你可以去干别的事,等叫号了再来取结果。
1 2 3 4 5 future = executor.submit(transcode, "video.mp4" )
一个基础的并行调度示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import subprocessfrom concurrent.futures import ProcessPoolExecutorimport osdef transcode (input_file ): output_file = input_file.replace(".mp4" , ".hevc.mp4" ) subprocess.run([ "ffmpeg" , "-i" , input_file, "-c:v" , "libx265" , "-crf" , "28" , "-y" , output_file ], check=True ) return input_file video_files = ["a.mp4" , "b.mp4" , "c.mp4" , "d.mp4" ] cpu_count = os.cpu_count() with ProcessPoolExecutor(max_workers=cpu_count) as executor: futures = [executor.submit(transcode, f) for f in video_files] for future in futures: result = future.result() print (f"完成:{result} " )
这已经是并行了——所有任务几乎同时提交,多个 ffmpeg 进程会并行跑。但还有一个问题:future.result() 是按提交顺序等待的。如果第一个任务跑得慢,即使第 2、3 个任务早就完成了,你也要等第一个完成才能打印进度。
3. Futures 与 as_completed 用例·
as_completed() 解决了上面的问题。它接受一个 Future 列表,返回一个迭代器,按完成顺序 逐个 yield Future, 具体说来就是哪个先完成,哪个先出来。
1 2 3 4 5 6 7 8 9 10 11 12 from concurrent.futures import ProcessPoolExecutor, as_completedwith ProcessPoolExecutor(max_workers=cpu_count) as executor: futures = {executor.submit(transcode, f): f for f in video_files} for future in as_completed(futures): filename = futures[future] try : result = future.result() print (f"[完成] {filename} " ) except Exception as e: print (f"[失败] {filename} :{e} " )
这里用了一个小技巧:用字典 {future: filename} 把 Future 和原始文件名关联起来,这样任务完成时能知道是哪个文件。
as_completed() 的好处很实际:
实时进度 :任务一完成就立即打印,不用等所有任务结束
快速失败 :某个任务失败时立即处理,而不是等到最后
自然的流式处理 :适合任务数量多、耗时差异大的场景
as_completed() 函数的实现原理
concurrent.futures.as_completed() 的实现其实很有意思,它不是轮询(polling),而是基于 Future 完成时触发回调以及条件变量、队列通知的机制。每个 Future 完成时主动通知一个等待器(waiter),等待器把它放进「已完成队列」,as_completed() 再从这个队列里逐个 yield。所以它是事件驱动,不是不断检查状态。
4. Ctrl C 中断的问题·
现在来说一个真实的工程困扰。
4.1 SIGINT 信号是怎么工作的·
在 Unix/Linux/macOS 中,按下 Ctrl C 不是直接杀进程,而是向当前终端的前台进程组 发送 SIGINT (Signal Interrupt,信号编号 2)。
"进程组"的概念是:一组相关进程的集合,通常一个 shell 命令和它的子进程在同一个进程组里。信号发给进程组时,组内所有进程 都会收到。
这意味着:当你的 Python 脚本(主进程)调用了 ProcessPoolExecutor(worker 进程),并且 worker 进程又启动了 ffmpeg 子进程,按下 Ctrl C 时,Python 主进程、所有 worker 进程、以及所有 ffmpeg 进程都会同时收到 SIGINT 。
4.2 为什么还是会残留进程·
问题出在竞态条件上。以下是可能发生的顺序:
用户按 Ctrl C
所有进程同时收到 SIGINT
Python 主进程捕获到 KeyboardInterrupt,开始清理并退出
ffmpeg 收到 SIGINT, 但它可能正在写文件 ,需要一点时间处理信号
Python 的 ProcessPoolExecutor 退出时,with 块会调用 executor.shutdown(wait=True),等待所有 worker 完成——但 worker 进程本身也收到了 SIGINT, 行为变得不可预测
更糟糕的情况:如果 ffmpeg 忽略了 SIGINT(某些情况下会),或者 Python 的进程池在清理期间出了问题,你就会在系统里留下孤儿 ffmpeg 进程,继续跑在后台,占着 CPU 和磁盘 I/O。
5. 使用独立进程组隔离 ffmpeg·
5.1 进程隔离·
解决思路是:让 ffmpeg 进程不在我们的进程组里 ,这样终端发来的 SIGINT 就不会直接打到 ffmpeg,而是只打到 Python,由 Python 来决定是否以及如何终止 ffmpeg。
做法是在启动 ffmpeg 时加 start_new_session=True:
1 2 3 4 proc = subprocess.Popen( ["ffmpeg" , "-i" , input_file, ...], start_new_session=True )
5.1 进程组和 session·
process group (进程组):一组进程共享同一个 PGID(进程组 ID),通常是父子关系的进程
session (会话):一个或多个进程组的集合,通常对应一个终端连接
start_new_session=True 的效果是:为 ffmpeg 创建一个全新的 session,ffmpeg 成为该 session 的 session leader,同时也是新进程组的组长。
结果是:ffmpeg 的进程组 ID(PGID)等于它自己的 PID,和我们的 Python 进程完全隔离。终端发来的 SIGINT 不再能直接到达 ffmpeg,信号只到达 Python。Python 可以完整地控制什么时候、用什么方式终止 ffmpeg。
6. 正确终止 ffmpeg 进程组·
隔离之后,Python 负责在中断时主动终止 ffmpeg。这里要用 os.killpg(),而不是 proc.kill()。
6.1 为什么不能只 kill 一个 PID·
ffmpeg 在某些操作下会 fork 子进程(例如处理某些格式时)。如果你只 kill 了 ffmpeg 的主 PID,它的子进程可能继续运行。
os.killpg(pgid, signal) 会向整个进程组发送信号,不管组里有几个进程,全部终止。
由于我们用了 start_new_session=True,ffmpeg 的 PGID 就等于它的 PID,所以:
1 2 3 4 import osimport signalos.killpg(proc.pid, signal.SIGTERM)
6.2 最终示例·
下面是一个健壮的 worker 函数示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import subprocessimport osimport signaldef transcode_worker (input_file ): output_file = input_file.replace(".mp4" , ".hevc.mp4" ) cmd = [ "ffmpeg" , "-i" , input_file, "-c:v" , "libx265" , "-crf" , "28" , "-y" , output_file ] proc = subprocess.Popen( cmd, start_new_session=True , stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) try : _, stderr = proc.communicate() if proc.returncode != 0 : raise RuntimeError(f"ffmpeg 失败:{stderr.decode()} " ) return input_file except KeyboardInterrupt: try : os.killpg(proc.pid, signal.SIGTERM) proc.wait(timeout=5 ) except (ProcessLookupError, subprocess.TimeoutExpired): try : os.killpg(proc.pid, signal.SIGKILL) except ProcessLookupError: pass raise
几点说明:
proc.communicate() 会阻塞直到 ffmpeg 结束,同时读取 stderr(用于错误信息)
捕获 KeyboardInterrupt 是因为 worker 进程本身也可能在某些情况下收到信号
先发 SIGTERM(优雅退出),等待 5 秒,超时后再发 SIGKILL(强制终止)
ProcessLookupError 表示进程已经不存在,忽略即可
7. 思考:调度层与计算层·
7.1 全局概览·
这个设计有一个值得强调的工程思路。
整个系统分两层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ┌──────────────────────────────────────────────┐ │ Python Orchestration Layer │ │ │ │ • Task queue management │ │ • Concurrency control (ProcessPoolExecutor) │ │ • Progress tracking (as_completed) │ │ • Signal handling (SIGINT → killpg) │ └─────────────────────┬────────────────────────┘ │ subprocess.Popen ┌──────────┼──────────┐ ▼ ▼ ▼ ffmpeg ffmpeg ffmpeg Process 1 Process 2 Process 3 │ CPU-intensive Compute Layer
Python 本身并不做转码计算,它只负责调度和控制 :决定开几个进程、什么时候开、出错怎么处理、怎么退出。真正消耗 CPU 的计算全部在 ffmpeg 里。
这是"用 Python 调度外部工具"的典型模式。Python 在这里是 orchestration layer (编排层),ffmpeg 是 compute layer (计算层)。这种分工让你可以充分利用成熟外部工具的性能,同时用 Python 的灵活性来处理控制逻辑。
7.2 关于 async/await 的补充·
有些读者可能听说过用 asyncio 做并发,也就是 Python 的异步 IO 模型。简单介绍一下语义:
async def 定义一个协程函数(coroutine),await 表示"在这里暂停,等待某个异步操作完成,期间让出控制权给其他协程"。整个系统只有一个线程,靠事件循环来调度多个协程,适合大量并发但每次操作耗时短的场景(比如同时发几千个 HTTP 请求)。
1 2 3 4 5 6 7 8 async def fetch_url (url ): response = await http_client.get(url) return response async def main (): results = await asyncio.gather(fetch_url(url1), fetch_url(url2))
对于 ffmpeg 这种任务,理论上也可以用 asyncio.create_subprocess_exec() 来管理子进程。但实际上 ProcessPoolExecutor 更适合这个场景:
ffmpeg 任务是 CPU 密集型,不是 IO 密集型,asyncio 的优势体现不出来
进程数量有限(通常等于 CPU 核心数),不需要 asyncio 擅长的"海量并发"
ProcessPoolExecutor + as_completed() 的代码更简单直接,心智负担更低
8. 完整示例·
下面是一个可以直接运行的完整版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 """ 可中断的 ffmpeg 并行转码调度器 用法: python transcoder.py video1.mp4 video2.mp4 video3.mp4 或批量: python transcoder.py *.mp4 """ import osimport signalimport subprocessimport sysfrom concurrent.futures import ProcessPoolExecutor, as_completeddef transcode_worker (input_file: str ) -> str : """ 在 worker 进程中执行 ffmpeg 转码。 使用 start_new_session=True 隔离进程组,以便安全地 killpg。 """ output_file = os.path.splitext(input_file)[0 ] + ".hevc.mp4" cmd = [ "ffmpeg" , "-i" , input_file, "-c:v" , "libx265" , "-crf" , "28" , "-c:a" , "copy" , "-y" , output_file, ] proc = subprocess.Popen( cmd, start_new_session=True , stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) try : _, stderr = proc.communicate() if proc.returncode != 0 : error_msg = stderr.decode(errors="replace" ).strip() raise RuntimeError(f"ffmpeg 退出码 {proc.returncode} :{error_msg[-500 :]} " ) return output_file except KeyboardInterrupt: _kill_process_group(proc) raise def _kill_process_group (proc: subprocess.Popen ) -> None : """向 ffmpeg 进程组发送 SIGTERM,超时后发 SIGKILL。""" try : os.killpg(proc.pid, signal.SIGTERM) proc.wait(timeout=5 ) except subprocess.TimeoutExpired: try : os.killpg(proc.pid, signal.SIGKILL) except ProcessLookupError: pass except ProcessLookupError: pass def run_parallel_transcode (video_files: list [str ] ) -> None : cpu_count = os.cpu_count() or 1 total = len (video_files) completed = 0 failed = 0 print (f"开始转码 {total} 个文件,并发数:{cpu_count} " ) future_to_file: dict = {} try : with ProcessPoolExecutor(max_workers=cpu_count) as executor: for f in video_files: future = executor.submit(transcode_worker, f) future_to_file[future] = f for future in as_completed(future_to_file): input_file = future_to_file[future] try : output_file = future.result() completed += 1 print (f"[{completed + failed} /{total} ] 完成:{input_file} → {output_file} " ) except KeyboardInterrupt: raise except Exception as e: failed += 1 print (f"[{completed + failed} /{total} ] 失败:{input_file} ,原因:{e} " ) except KeyboardInterrupt: print ("\n收到中断信号,正在退出..." ) print (f"\n完成:{completed} 个,失败:{failed} 个,总计:{total} 个" ) if __name__ == "__main__" : if len (sys.argv) < 2 : print ("用法:python transcoder.py <file1.mp4> [file2.mp4 ...]" ) sys.exit(1 ) video_files = sys.argv[1 :] run_parallel_transcode(video_files)
运行示例:
1 2 3 4 5 6 7 8 $ python transcoder.py *.mp4 开始转码 8 个文件,并发数:8 [1/8] 完成:short_clip.mp4 → short_clip.hevc.mp4 [2/8] 完成:intro.mp4 → intro.hevc.mp4 ^C 收到中断信号,正在退出... 完成:2 个,失败:0 个,总计:8 个
按 Ctrl C 后,所有 ffmpeg 进程会在几秒内干净地退出,不留残留进程。
技术点
作用
ProcessPoolExecutor
管理进程池,充分利用多核 CPU
submit() + Future
异步提交任务,立即返回占位符
as_completed()
按完成顺序处理任务,实现实时进度
start_new_session=True
隔离 ffmpeg 进程组,避免信号竞态
os.killpg()
终止整个进程组,包括可能的子进程
这个调度器的核心思路是:Python 只做控制,ffmpeg 负责计算。把两层职责分清楚,信号处理和错误处理才能写得干净。