通用的python执行命令封装实现
实现
stream与 非stream双模式:- 默认使用
subprocess.run一次性捕获输出,性能最高且没有线程安全隐患。 - 当设置
stream=True时,底层切换为subprocess.Popen,通过bufsize=1实现按行读取,并且将stderr重定向到了stdout。这不仅能避免处理多个管道造成的线程死锁问题,还能保证终端上报错信息和正常信息的打印顺序是不错乱的。
- 默认使用
- 安全的超时控制清理:
在
stream=True的模式下,如果触发了超时异常,代码会自动捕获异常并执行process.kill()和wait(),防止僵尸进程 (Zombie Process) 的产生。 - 隔离的运行时环境:
run_env.update(env)的操作使用了父进程环境的深拷贝,确保不会污染整个 Python 应用的全局环境变量。 - 日志的分级输出:成功时使用
.debug()(或在流模式下使用.info()逐行打印),失败时自动组装报错详情(附带 STDERR 的内容)并使用.error()输出。非常利于排查问题。
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any
# 注意:为了类型提示正确,推荐这样导入 Loguru 的 Logger 类型
from loguru import logger
from loguru._logger import Logger
@dataclass
class CmdResult:
"""封装 Shell 命令的执行结果"""
command: str
returncode: int
stdout: str
stderr: str
timeout_expired: bool = False
@property
def success(self) -> bool:
"""命令是否成功执行 (返回码为0且未超时)"""
return self.returncode == 0 and not self.timeout_expired
def bash(
cmd: str,
timeout: int | None = None,
cwd: str | Path | None = None,
env: dict[str, str] | None = None,
stream: bool = False,
log: Logger | None = None,
raise_on_error: bool = False,
log_on_error: bool = True
) -> CmdResult:
"""
通用、健壮的 Shell 命令执行函数。
:param cmd: 需要执行的 shell 命令
:param timeout: 超时时间(秒)。超时后将强行终止命令。
:param cwd: 执行命令的工作目录。
:param env: 传递给子进程的额外环境变量。
:param stream: 是否实时流式输出日志 (适用于耗时较长的命令)。开启后 stderr 会合并入 stdout。
:param log: 传入的 loguru Logger 实例,如果为空则使用系统自带的 print 或忽略。
:param raise_on_error: 当命令执行失败(非0退出码)或超时时,是否抛出异常。
:param log_on_error: 当命令执行失败时,是否自动记录错误日志。
:return: CmdResult 对象,包含返回码、标准输出和标准错误。
"""
# 准备环境变量
run_env = os.environ.copy()
if env:
run_env.update(env)
# 转换 cwd 为字符串
run_cwd = str(cwd) if cwd else None
if log:
log.debug(f"Executing: `{cmd}`" + (f" in {run_cwd}" if run_cwd else ""))
stdout_lines = []
stderr_lines = []
returncode = -1
timeout_expired = False
try:
if stream:
# 流式读取:将 stderr 重定向到 stdout 以避免死锁,并实时读取
process = subprocess.Popen(
cmd,
shell=True,
executable="/bin/bash",
cwd=run_cwd,
env=run_env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 合并输出
text=True, # 自动解码
bufsize=1 # 行缓冲
)
# 实时读取输出
if process.stdout:
for line in process.stdout:
clean_line = line.strip('\n')
stdout_lines.append(clean_line)
if log:
log.info(clean_line)
else:
print(clean_line)
# 等待进程结束(带有超时)
returncode = process.wait(timeout=timeout)
else:
# 非流式读取:一次性捕获并等待完成
process = subprocess.run(
cmd,
shell=True,
executable="/bin/bash",
cwd=run_cwd,
env=run_env,
capture_output=True,
text=True,
timeout=timeout
)
returncode = process.returncode
if process.stdout:
stdout_lines = process.stdout.splitlines()
if process.stderr:
stderr_lines = process.stderr.splitlines()
except subprocess.TimeoutExpired as e:
timeout_expired = True
error_msg = f"Command timed out after {timeout} seconds: `{cmd}`"
# 针对 Popen 流式读取模式,手动终止进程
if stream and 'process' in locals():
process.kill()
process.wait()
if log_on_error and log:
log.error(error_msg)
if raise_on_error:
raise TimeoutError(error_msg) from e
# 封装结果
result = CmdResult(
command=cmd,
returncode=returncode,
stdout="\n".join(stdout_lines).strip(),
stderr="\n".join(stderr_lines).strip(),
timeout_expired=timeout_expired
)
# 错误处理逻辑
if not result.success and not timeout_expired:
error_msg = f"Command failed with exit code {result.returncode}: `{cmd}`"
if result.stderr:
error_msg += f"\nSTDERR: {result.stderr}"
elif result.stdout: # 流模式下 stderr 被合并到了 stdout
error_msg += f"\nOUTPUT: {result.stdout}"
if log_on_error and log:
log.error(error_msg)
if raise_on_error:
raise RuntimeError(error_msg)
elif result.success and log and not stream:
log.debug(f"Command success. Output: {result.stdout}")
return result
# ==========================================
# 🚀 测试与使用用例 (可以直接运行本脚本查看效果)
# ==========================================
if __name__ == "__main__":
# 用例 1: 简单的命令执行 (捕获输出)
logger.info("--- Test 1: Simple Command ---")
res1 = bash("echo 'Hello World!'", log=logger)
logger.success(f"Result: {res1.stdout}\n")
# 用例 2: 包含环境变量与工作目录
logger.info("--- Test 2: Env & Cwd ---")
res2 = bash(
"pwd && echo $MY_VAR",
cwd="/tmp",
env={"MY_VAR": "Python Expert"},
log=logger
)
logger.success(f"Result:\n{res2.stdout}\n")
# 用例 3: 实时流式输出 (模拟耗时任务)
logger.info("--- Test 3: Stream Output ---")
bash(
"for i in {1..3}; do echo 'Processing step $i...'; sleep 1; done",
stream=True,
log=logger
)
logger.success("Stream test finished.\n")
# 用例 4: 错误处理与日志记录 (不抛出异常,只打错误日志)
logger.info("--- Test 4: Error Handling ---")
res4 = bash("ls /path/to/non_existent_directory", log=logger, log_on_error=True)
logger.warning(f"Was it successful? {res4.success} (Return Code: {res4.returncode})\n")
# 用例 5: 超时控制 (主动抛出异常)
logger.info("--- Test 5: Timeout Control ---")
try:
bash("sleep 5", timeout=2, log=logger, raise_on_error=True)
except TimeoutError as e:
logger.error(f"Successfully caught expected error: {e}")