Python 多线程示例
介绍
多进程间通信,可以使用 multiprocessing
模块提供了 Queue
,Pipes
os.fork()
创建子进程,如果返回值为0,则是子进程,否则是父进程- 由于 windows 平台不支持 fork 方法,所以python提供
multiprocessing
模块来跨平台多进程,multiprocessing
模块封装了 fork()
调用,multiprocessing
模块提供Process类代表进程- 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用
start()
方法启动
- 如果要启动大量的子进程,可以用进程池的方式批量创建子进程,
from multiprocessing import Pool
multiprocessing.Pipe()
一般用于进程或线程间的通信,创建管道时返回两个连接对象,代表管道的两端- 类似的
os.pipe()
,但它主要用来创建两个文件描述符,一个读,一个写,单向通信
示例
multiprocessing.Pipe
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from multiprocessing import Process, Pipe
def send(pipe):
pipe.send('i am something ...')
print(f'send got: {pipe.recv()}')
pipe.close()
def recv(pipe):
pipe.send(dict(name='xianbin', age=18))
print(f'recv got: {pipe.recv()}')
pipe.close()
if __name__ == '__main__':
(con1, con2) = Pipe()
sender = Process(target=send, name='send', args=(con1,))
sender.start()
recver = Process(target=recv, name='talk', args=(con2,))
recver.start()
multiprocessing.Pool
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import random
from multiprocessing import Pool
def sub_task(name):
spid = os.getpid()
print('run sub task name is {}, sub pid is {}...'.format(name, spid))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('sub task %s runs %0.2f seconds.' % (name, (end - start)))
return spid
if __name__=='__main__':
print('main process %s.' % os.getpid())
p = Pool(4)
results = []
for i in range(10):
r = p.apply_async(sub_task, args=(i,))
results.append(r)
print('Waiting for all subprocesses done...')
for r in results:
print("-- result {}".format(r.get()))
p.close()
p.join()
print('All sub proceess done.')
import os
import multiprocessing
def worker_function(arg):
# 访问加载的环境变量
print(os.environ["MY_ENV_VAR"])
def initializer(env):
# 将环境变量加载到工作进程的环境中
os.environ.update(env)
if __name__ == "__main__":
# 在主进程中设置环境变量
env = {"MY_ENV_VAR": "my_value"}
# 启动进程池并加载环境变量
pool = multiprocessing.Pool(processes=4, initializer=initializer, initargs=(env,))
# 向进程池提交工作函数
pool.map(worker_function, range(10))
# 关闭进程池
pool.close()
pool.join()
multiprocessing.Queue
import time
from multiprocessing import Process, Queue
def reader_proc(queue):
"""Read from the queue; this spawns as a separate Process"""
while True:
msg = queue.get() # Read from the queue and do nothing
if msg == "DONE":
break
def writer(count, num_of_reader_procs, queue):
"""Write integers into the queue. A reader_proc() will read them from the queue"""
for ii in range(0, count):
queue.put(ii) # Put 'count' numbers into queue
### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
queue.put("DONE")
def start_reader_procs(qq, num_of_reader_procs):
"""Start the reader processes and return all in a list to the caller"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = Process(target=reader_proc, args=((qq),))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc
all_reader_procs.append(reader_p)
return all_reader_procs
if __name__ == "__main__":
num_of_reader_procs = 2
qq = Queue() # writer() writes to qq from _this_ process
for count in [10**4, 10**5, 10**6]:
assert 0 < num_of_reader_procs < 4
all_reader_procs = start_reader_procs(qq, num_of_reader_procs)
writer(count, len(all_reader_procs), qq) # Queue stuff to all reader_p()
print("All reader processes are pulling numbers from the queue...")
_start = time.time()
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish
print(" reader_p() idx:%s is done" % idx)
print(
"Sending {0} integers through Queue() took {1} seconds".format(
count, (time.time() - _start)
)
)
print("")
共享变量
from multiprocessing import Pool, Manager
def worker(d, lock):
with lock:
d['count'] += 1
if __name__ == "__main__":
with Manager() as manager:
d = manager.dict()
d['count'] = 0
lock = manager.Lock()
with Pool(processes=4) as pool:
for _ in range(1000):
pool.apply_async(worker, args=(d, lock))
pool.close()
pool.join()
print(d['count']) # 输出应为1000,因为有1000次调用worker函数
扩展
concurrent.futures
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
- 解决方式,防止
if __name__ == '__main__':
中调用