Python multiprocessing 多线程示例

发布时间: 更新时间: 总字数:979 阅读时间:2m 作者: IP上海 分享 网址

Python 多线程示例

介绍

多进程间通信,可以使用 multiprocessing 模块提供了 QueuePipes

  • os.fork() 创建子进程,如果返回值为0,则是子进程,否则是父进程
  • 由于 windows 平台不支持 fork 方法,所以python提供 multiprocessing 模块来跨平台多进程,multiprocessing 模块封装了 fork() 调用,multiprocessing模块提供Process类代表进程
    • 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用 start() 方法启动
  • 如果要启动大量的子进程,可以用进程池的方式批量创建子进程,from multiprocessing import Pool
  • multiprocessing.Pipe() 一般用于进程或线程间的通信,创建管道时返回两个连接对象,代表管道的两端
    • 类似的 os.pipe(),但它主要用来创建两个文件描述符,一个读,一个写,单向通信

示例

multiprocessing.Pipe

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from multiprocessing import Process, Pipe


def send(pipe):
    pipe.send('i am something ...')
    print(f'send got: {pipe.recv()}')
    pipe.close()


def recv(pipe):
    pipe.send(dict(name='xianbin', age=18))
    print(f'recv got: {pipe.recv()}')
    pipe.close()


if __name__ == '__main__':
    (con1, con2) = Pipe()
    sender = Process(target=send, name='send', args=(con1,))
    sender.start()

    recver = Process(target=recv, name='talk', args=(con2,))
    recver.start()

multiprocessing.Pool

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import time
import random
from multiprocessing import Pool


def sub_task(name):
    spid = os.getpid()
    print('run sub task name is {}, sub pid is {}...'.format(name, spid))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('sub task %s runs %0.2f seconds.' % (name, (end - start)))
    return spid


if __name__=='__main__':
    print('main process %s.' % os.getpid())
    p = Pool(4)
    results = []
    for i in range(10):
        r = p.apply_async(sub_task, args=(i,))
        results.append(r)
    print('Waiting for all subprocesses done...')

    for r in results:
        print("-- result {}".format(r.get()))
    p.close()
    p.join()
    print('All sub proceess done.')
import os
import multiprocessing

def worker_function(arg):
    # 访问加载的环境变量
    print(os.environ["MY_ENV_VAR"])

def initializer(env):
    # 将环境变量加载到工作进程的环境中
    os.environ.update(env)

if __name__ == "__main__":
    # 在主进程中设置环境变量
    env = {"MY_ENV_VAR": "my_value"}

    # 启动进程池并加载环境变量
    pool = multiprocessing.Pool(processes=4, initializer=initializer, initargs=(env,))

    # 向进程池提交工作函数
    pool.map(worker_function, range(10))

    # 关闭进程池
    pool.close()
    pool.join()

multiprocessing.Queue

import time

from multiprocessing import Process, Queue


def reader_proc(queue):
    """Read from the queue; this spawns as a separate Process"""
    while True:
        msg = queue.get()  # Read from the queue and do nothing
        if msg == "DONE":
            break


def writer(count, num_of_reader_procs, queue):
    """Write integers into the queue.  A reader_proc() will read them from the queue"""
    for ii in range(0, count):
        queue.put(ii)  # Put 'count' numbers into queue

    ### Tell all readers to stop...
    for ii in range(0, num_of_reader_procs):
        queue.put("DONE")


def start_reader_procs(qq, num_of_reader_procs):
    """Start the reader processes and return all in a list to the caller"""
    all_reader_procs = list()
    for ii in range(0, num_of_reader_procs):
        ### reader_p() reads from qq as a separate process...
        ###    you can spawn as many reader_p() as you like
        ###    however, there is usually a point of diminishing returns
        reader_p = Process(target=reader_proc, args=((qq),))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_p() as another proc

        all_reader_procs.append(reader_p)

    return all_reader_procs


if __name__ == "__main__":
    num_of_reader_procs = 2
    qq = Queue()  # writer() writes to qq from _this_ process
    for count in [10**4, 10**5, 10**6]:
        assert 0 < num_of_reader_procs < 4
        all_reader_procs = start_reader_procs(qq, num_of_reader_procs)

        writer(count, len(all_reader_procs), qq)  # Queue stuff to all reader_p()
        print("All reader processes are pulling numbers from the queue...")

        _start = time.time()
        for idx, a_reader_proc in enumerate(all_reader_procs):
            print("    Waiting for reader_p.join() index %s" % idx)
            a_reader_proc.join()  # Wait for a_reader_proc() to finish

            print("        reader_p() idx:%s is done" % idx)

        print(
            "Sending {0} integers through Queue() took {1} seconds".format(
                count, (time.time() - _start)
            )
        )
        print("")

共享变量

from multiprocessing import Pool, Manager

def worker(d, lock):
    with lock:
        d['count'] += 1

if __name__ == "__main__":
    with Manager() as manager:
        d = manager.dict()
        d['count'] = 0
        lock = manager.Lock()

        with Pool(processes=4) as pool:
            for _ in range(1000):
                pool.apply_async(worker, args=(d, lock))
            pool.close()
            pool.join()

        print(d['count'])  # 输出应为1000,因为有1000次调用worker函数

扩展

concurrent.futures

  • 错误日志
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
  • 解决方式,防止 if __name__ == '__main__': 中调用
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数