Python paramiko 模拟 SSH 登陆 Linux 执行命令

发布时间: 更新时间: 总字数:878 阅读时间:2m 作者: IP上海 分享 网址

在使用 Python 写脚本时,需要登陆 Linux 服务器去执行命令,并获取返回结果

Shell 代码

ssh -p 22 -o StrictHostKeyChecking=no user@ip "ip a"

Python 代码

# -*- coding: utf-8 -*-
import paramiko
import re

from collections import namedtuple
from six import StringIO


def dict_to_namedtuple(dic):
    """从dict转换到namedtuple"""
    return namedtuple('AttrStore', dic.keys())(**dic)


def choices_to_namedtuple(choices):
    """从django-model的choices转换到namedtuple"""
    return dict_to_namedtuple(dict(choices))


def tuple_choices(tupl):
    """从django-model的choices转换到namedtuple"""
    return [(t, t) for t in tupl]


AUTH_TUPLE = ('PASSWORD', 'KEY', 'CERT_KEY', 'TJJ_PASSWORD')
AUTH_CHOICES = tuple_choices(AUTH_TUPLE)
AuthType = choices_to_namedtuple(AUTH_CHOICES)

DEFAULT_SSH_KEY_PATH = '/root/.ssh/id_rsa'
PARAMIKO_LOG_PATH = '/tmp/paramiko.log'


class ShellMan:

    def __init__(self, account, ip, port=22, password=None, key=None,
                 auth_type=AuthType.PASSWORD, timeout=10, auto_set_prompt=True):
        self.account = account
        self.ip = ip
        self.port = port
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy())
        # self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # self.ssh.load_system_host_keys()
        # 密钥认证
        if auth_type == AuthType.KEY:
            if key:
                pkey = paramiko.RSAKey.from_private_key(StringIO(key))
            else:
                pkey = paramiko.RSAKey.from_private_key_file(DEFAULT_SSH_KEY_PATH)
            paramiko.util.log_to_file(PARAMIKO_LOG_PATH)
            self.ssh.connect(hostname=self.ip, username=self.account, port=self.port, pkey=pkey, timeout=timeout)

        # 密码认证
        else:
            self.ssh.connect(self.ip, username=self.account, password=password, port=self.port, timeout=timeout)

        channel = self.ssh.invoke_shell()
        self.std_in = channel.makefile('wb')
        self.std_out = channel.makefile('r')

        self._input_flag = '_SSHProxy'
        if auto_set_prompt:
            _, _ = self._set_prompt()

    def __del__(self):
        self.ssh.close()

    def _set_prompt(self):
        # 清空 ssh 产生的 history
        _, _, _ = self.execute('export HISTSIZE=0')

        # 设置新的终端提示符,清除 welcome 数据
        cmd = r'export PS1="[\u@\h{} \W]\$"'.format(self._input_flag)
        _, _std_out, _std_err = self.execute(cmd)
        return _std_out, _std_err

    def execute(self, cmd):
        """
        run shell command

        :param cmd: the command to be executed on the remote computer
        :examples:
          execute('ls')
          execute('cd folder_name')
        """
        cmd = cmd.strip('\n')
        self.std_in.write(cmd + '\n')
        finish = 'SSHProxy exec done'
        echo_cmd = f'echo {finish} $?'
        self.std_in.write(echo_cmd + '\n')
        std_in = self.std_in
        self.std_in.flush()

        std_out = []
        std_err = []
        for line in self.std_out:
            if str(line).startswith(cmd) or str(line).startswith(echo_cmd) or str(line).find(self._input_flag) > 0:
                # filled with shell junk from std_in, skip
                continue
            elif str(line).startswith(finish):
                try:
                    # our finish command ends with the exit status
                    exit_status = int(str(line).rsplit(maxsplit=1)[1])
                    if exit_status:
                        # std_err is combined with std_out.
                        # thus, swap std_err with std_out in a case of failure.
                        std_err = std_out
                        std_out = []
                    break
                except ValueError as e:
                    # raise Exception(f'remote run shell {self.account}@{self.ip}:{self.port} [{cmd}] by ssh fail, '
                                    f'last line is: [{line}], exception is: [{e}], std_out is: [{std_out}]')
                    stderr = f'remote run shell {self.account}@{self.ip}:{self.port} [{cmd}] by ssh fail, ' \
                             f'last line is: [{line}], exception is: [{e}], stdout is: [{stdout}]'
                    stdout = []
                    break
            else:
                # get rid of 'coloring and formatting' special characters
                std_out.append(re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]').sub('', line).
                               replace('\b', '').replace('\r', ''))

        # first and last lines of std_out/std_err contain a prompt
        if std_out and echo_cmd in std_out[-1]:
            std_out.pop()
        if std_out and cmd in std_out[0]:
            std_out.pop(0)
        if std_err and echo_cmd in std_err[-1]:
            std_err.pop()
        if std_err and cmd in std_err[0]:
            std_err.pop(0)

        # remove echo_cmd
        if len(std_out) > 0:
            std_out = std_out[:len(std_out)]
        return std_in, std_out, std_err


if __name__ == '__main__':
    ssh = ShellMan('root', "172.20.0.20", 22, password="Passw0rd")
    stdin, stdout, stderr = ssh.execute("ip a")
    print("-{}-".format(stdout))

FAQ

Python hung死问题

paramiko 依赖 threading.py 问题,调试过程参考:Linux GDB 调试#Python Hung 进程排查示例

远程执行后台脚本异常退出问题

使用 paramiko 封装远程执行比较耗时的命令,期望把命令挂到后台去,但使用如下脚本执行时秒退出:

# 分发 test.sh 文件
ssh.execute('''cat << EOF > /tmp/test.sh
#!/bin/bash
sleep 60
echo "test done"
exit 0
EOF
''')

# 执行
ssh.execute('bash /tmp/test.sh')
  • 解决方式

通过多次封装将任务挂到后台去,

if __name__ == '__main__':
    ssh = ShellMan('root', "172.20.0.20", 22, password="Passw0rd")
    # 分发 test.sh 文件
    _, _, _, = ssh.execute('''cat << EOF > /tmp/test.sh
#!/bin/bash
sleep 60 >> /tmp/sleep.log
exit 0
EOF
''')

    # 分发 run.sh 文件
    _, _, _, = ssh.execute('''cat << EOF > /tmp/run.sh
#!/bin/bash
bash /tmp/test.sh &
echo "done"
exit 0
EOF
''')

    # 执行s
    _, _, _, = ssh.execute('bash /tmp/run.sh 2>&1')
    # _, _, _, = ssh.execute('bash /tmp/run.sh 1>&2')
    print('done')
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数