通用的python执行命令封装实现

发布时间: 更新时间: 总字数:1321 阅读时间:3m 作者:IP:上海 网址

通用的python执行命令封装实现

实现

  1. stream 与 非 stream 双模式
    • 默认使用 subprocess.run 一次性捕获输出,性能最高且没有线程安全隐患。
    • 当设置 stream=True 时,底层切换为 subprocess.Popen,通过 bufsize=1 实现按行读取,并且stderr 重定向到了 stdout。这不仅能避免处理多个管道造成的线程死锁问题,还能保证终端上报错信息和正常信息的打印顺序是不错乱的。
  2. 安全的超时控制清理: 在 stream=True 的模式下,如果触发了超时异常,代码会自动捕获异常并执行 process.kill()wait(),防止僵尸进程 (Zombie Process) 的产生。
  3. 隔离的运行时环境run_env.update(env) 的操作使用了父进程环境的深拷贝,确保不会污染整个 Python 应用的全局环境变量。
  4. 日志的分级输出:成功时使用 .debug()(或在流模式下使用 .info() 逐行打印),失败时自动组装报错详情(附带 STDERR 的内容)并使用 .error() 输出。非常利于排查问题。
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any

# 注意:为了类型提示正确,推荐这样导入 Loguru 的 Logger 类型
from loguru import logger
from loguru._logger import Logger


@dataclass
class CmdResult:
    """封装 Shell 命令的执行结果"""
    command: str
    returncode: int
    stdout: str
    stderr: str
    timeout_expired: bool = False

    @property
    def success(self) -> bool:
        """命令是否成功执行 (返回码为0且未超时)"""
        return self.returncode == 0 and not self.timeout_expired


def bash(
    cmd: str,
    timeout: int | None = None,
    cwd: str | Path | None = None,
    env: dict[str, str] | None = None,
    stream: bool = False,
    log: Logger | None = None,
    raise_on_error: bool = False,
    log_on_error: bool = True
) -> CmdResult:
    """
    通用、健壮的 Shell 命令执行函数。

    :param cmd: 需要执行的 shell 命令
    :param timeout: 超时时间(秒)。超时后将强行终止命令。
    :param cwd: 执行命令的工作目录。
    :param env: 传递给子进程的额外环境变量。
    :param stream: 是否实时流式输出日志 (适用于耗时较长的命令)。开启后 stderr 会合并入 stdout。
    :param log: 传入的 loguru Logger 实例,如果为空则使用系统自带的 print 或忽略。
    :param raise_on_error: 当命令执行失败(非0退出码)或超时时,是否抛出异常。
    :param log_on_error: 当命令执行失败时,是否自动记录错误日志。
    :return: CmdResult 对象,包含返回码、标准输出和标准错误。
    """

    # 准备环境变量
    run_env = os.environ.copy()
    if env:
        run_env.update(env)

    # 转换 cwd 为字符串
    run_cwd = str(cwd) if cwd else None

    if log:
        log.debug(f"Executing: `{cmd}`" + (f" in {run_cwd}" if run_cwd else ""))

    stdout_lines = []
    stderr_lines = []
    returncode = -1
    timeout_expired = False

    try:
        if stream:
            # 流式读取:将 stderr 重定向到 stdout 以避免死锁,并实时读取
            process = subprocess.Popen(
                cmd,
                shell=True,
                executable="/bin/bash",
                cwd=run_cwd,
                env=run_env,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT, # 合并输出
                text=True,                # 自动解码
                bufsize=1                 # 行缓冲
            )

            # 实时读取输出
            if process.stdout:
                for line in process.stdout:
                    clean_line = line.strip('\n')
                    stdout_lines.append(clean_line)
                    if log:
                        log.info(clean_line)
                    else:
                        print(clean_line)

            # 等待进程结束(带有超时)
            returncode = process.wait(timeout=timeout)

        else:
            # 非流式读取:一次性捕获并等待完成
            process = subprocess.run(
                cmd,
                shell=True,
                executable="/bin/bash",
                cwd=run_cwd,
                env=run_env,
                capture_output=True,
                text=True,
                timeout=timeout
            )
            returncode = process.returncode
            if process.stdout:
                stdout_lines = process.stdout.splitlines()
            if process.stderr:
                stderr_lines = process.stderr.splitlines()

    except subprocess.TimeoutExpired as e:
        timeout_expired = True
        error_msg = f"Command timed out after {timeout} seconds: `{cmd}`"

        # 针对 Popen 流式读取模式,手动终止进程
        if stream and 'process' in locals():
            process.kill()
            process.wait()

        if log_on_error and log:
            log.error(error_msg)
        if raise_on_error:
            raise TimeoutError(error_msg) from e

    # 封装结果
    result = CmdResult(
        command=cmd,
        returncode=returncode,
        stdout="\n".join(stdout_lines).strip(),
        stderr="\n".join(stderr_lines).strip(),
        timeout_expired=timeout_expired
    )

    # 错误处理逻辑
    if not result.success and not timeout_expired:
        error_msg = f"Command failed with exit code {result.returncode}: `{cmd}`"
        if result.stderr:
            error_msg += f"\nSTDERR: {result.stderr}"
        elif result.stdout: # 流模式下 stderr 被合并到了 stdout
            error_msg += f"\nOUTPUT: {result.stdout}"

        if log_on_error and log:
            log.error(error_msg)

        if raise_on_error:
            raise RuntimeError(error_msg)

    elif result.success and log and not stream:
        log.debug(f"Command success. Output: {result.stdout}")

    return result


# ==========================================
# 🚀 测试与使用用例 (可以直接运行本脚本查看效果)
# ==========================================
if __name__ == "__main__":

    # 用例 1: 简单的命令执行 (捕获输出)
    logger.info("--- Test 1: Simple Command ---")
    res1 = bash("echo 'Hello World!'", log=logger)
    logger.success(f"Result: {res1.stdout}\n")

    # 用例 2: 包含环境变量与工作目录
    logger.info("--- Test 2: Env & Cwd ---")
    res2 = bash(
        "pwd && echo $MY_VAR",
        cwd="/tmp",
        env={"MY_VAR": "Python Expert"},
        log=logger
    )
    logger.success(f"Result:\n{res2.stdout}\n")

    # 用例 3: 实时流式输出 (模拟耗时任务)
    logger.info("--- Test 3: Stream Output ---")
    bash(
        "for i in {1..3}; do echo 'Processing step $i...'; sleep 1; done",
        stream=True,
        log=logger
    )
    logger.success("Stream test finished.\n")

    # 用例 4: 错误处理与日志记录 (不抛出异常,只打错误日志)
    logger.info("--- Test 4: Error Handling ---")
    res4 = bash("ls /path/to/non_existent_directory", log=logger, log_on_error=True)
    logger.warning(f"Was it successful? {res4.success} (Return Code: {res4.returncode})\n")

    # 用例 5: 超时控制 (主动抛出异常)
    logger.info("--- Test 5: Timeout Control ---")
    try:
        bash("sleep 5", timeout=2, log=logger, raise_on_error=True)
    except TimeoutError as e:
        logger.error(f"Successfully caught expected error: {e}")