Python 多线程实现方式
介绍
- Python 的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:
Global Interpreter Lock
,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核
- GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。所以,在Python中,可以使用多线程,但不要指望能有效利用多核
- 协程(Coroutine)
- coroutines是线性处理的,同一时间只有一个coroutine在执行。
- subroutine是coroutine的特例
- subroutine只返回一次,并且不保存调用别人后不保存状态,相反,coroutine保存状态,coroutine可以调用coroutine,是通过yield的方式
- 设计一个支持subroutine的语言只需要预分配一个stack即可,对应的,要支持coroutine的话,就需要预分配多个stack
- coroutine有什么用?以producer和consumer为例,用coroutines会相当高效,都不需要底层thread切换,也不需要经过OS的公共资源,而是直接coroutine间调用就搞定了。
- green thread 是由vm管理的,不是OS管理的,green threads 是仿真了多线程环境,不依赖与底层的OS,是运行在 user space 下。
- 在多核处理器下,native thread 可以自动分配到多核去,而green thread不行
- java 1.1中,green thread是唯一的线程模型,由于green thread比native thread有很多不足,java后续版本抛弃了green thread
- python中的
eventlet
是green thread
实现
- ThreadLocal的好处是让线程有了自己的空间,而不用全局共享的,同时又避免了作为参数传来传去。
- multiprocessing
- 多进程比多线程稳定些,因为多线程如果其中一个线程有问题,有可能导致整个进程有问题
- threadpool
threading
官方标准库
threading.Thread 示例
说明:
# -*- coding: utf-8 -*-
# 创建全局ThreadLocal对象:
import threading
from datetime import datetime
localVal = threading.local()
localVal.val = "Main-Thread"
def print_func():
print(f'{datetime.now()} {localVal.val} in {threading.current_thread().name}')
def main_func(name):
# 赋值
localVal.val = name
print_func()
if __name__ == '__main__':
t1 = threading.Thread(target=main_func, args=('One',), name='Thread-One')
t2 = threading.Thread(target=main_func, args=('Two',), name='Thread-Two')
t1.start()
t2.start()
t1.join()
t2.join()
print(f'{datetime.now()} {localVal.val}')
2017-04-01 11:30:03.366336 One in Thread-One
2017-04-01 11:30:03.366475 Two in Thread-Two
2017-04-01 11:30:03.366545 Main-Thread
说明:
threading.local()
用来保存一个全局变量,但该全局变量只有在当前线程才能访问
localVal.val = name
储存一个变量 name
到当前线程
- 如果在
线程A
里面再次对localVal.val
进行赋值,那么线程A
会单独创建内存空间来存储赋值
- 即在不同的线程里面赋值不会相互覆盖
执行命令获取stdout/stderr
import queue
import subprocess
import threading
import sys
def read_from_stream(stream, output_func, q):
"""
从给定的流中读取数据,并使用指定的函数输出。
"""
result = []
for line in iter(stream.readline, b''):
if line is None:
break
decoded_line = line.decode('utf-8').rstrip() if isinstance(line, bytes) else line.rstrip()
output_func(decoded_line)
result.append(decoded_line)
q.put(result)
def print_stdout(line):
"""
打印标准输出。
"""
print(line)
def print_stderr(line):
"""
打印标准错误输出(到stderr)。
"""
print('stderr', line, file=sys.stderr)
def run_command(command):
"""
运行命令,并启动两个线程来分别监听stdout和stderr。
"""
outq = queue.Queue()
errq = queue.Queue()
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 创建并启动线程
stdout_thread = threading.Thread(target=read_from_stream, args=(process.stdout, print_stdout, outq))
stderr_thread = threading.Thread(target=read_from_stream, args=(process.stderr, print_stderr, errq))
stdout_thread.start()
stderr_thread.start()
# 等待子进程完成
stdout_thread.join(timeout=60)
stderr_thread.join(timeout=60)
# 等待子进程完成
process.wait()
return process.poll(), outq.get(), errq.get()
command = ['ping', '-c', '5', 'xiexianbin.cn']
print(run_command(command))