多进程编程及进程间的通信、数据传输python代码实现
多进程编程及进程间的通信
- 意义:充分利用计算机的资源提高程序的运算速率
- 定义:通过应用程序利用计算机多个核心达到同时执行多个任务的目的,以此提高计算机的运行速率
- 实施方案:多进程 多线程
- 并行: 计算机同时处理多个任务
- 并发:同时处理多个任务,内核在不断的任务间小虎切换,达到好像还都在处理运行的效果,但是实际是一个时间点内核只能处理其中一个任务
多进程的优缺点
- 优点
- 可以使用计算机多核,进行任务的并发执行,提高执行效率
- 空间独立,数据安全
- 运行不受其他进程影响,创建方便
- 缺点
- 进程的删除和创建消耗的系统资源多
进程(process)
查看进程:ps -aux
查看进程树:pstree
查看父进程:ps -ajx
标志 | 名称 | 说明 |
---|---|---|
S | 等待态 | 可中断等待 |
D | 等待态 | 不可中断等待 |
T | 等待态 | 暂停状态 |
R | 运行态 | 包含就绪状态 |
Z | 僵尸进程 | |
< | 高优先级 | |
N | 优先级较低 | |
l | 有子进程的 | |
s | 会话组组长 | |
+ | 前台进程 |
三态
- 就绪态:进程具备执行条件,等待系统分配资源
- 运行态:进程占有CPU,处于运行状态
- 等待态:进程暂时不具备执行条件,阻塞等待,满足条件后再执行
五态(三态的基础上增加了新建态和终止态)
- 新建态:创建一个新的进程,获取资源的过程
- 终止态:进程执行结束,资源释放回收的过程
进程优先级
- 作用:决定了一个进程的执行权限
- 动态查看系统中的进程信息:
top
,用< , >
翻页- 取值范围:-20 -- 19 -20优先级最高
- 使用指定的优先级运行程序
nice
: 指定运行的优先级e.g. :nice -9 ./while.py
--->>以优先级-9运行
进程特征
- 进程之间运行互不影响,各自独立运行
- 进程是操作系统资源分配的最小单位
- 每个进程空间独立,各自占有一定的虚拟内存
孤儿进程
- 父进程先于子进程退出,此时子进程就称为孤儿进程
- 孤儿进程会被操作系统指定的进程收养,系统进程就成为了孤儿进程的父进程
僵尸进程
- 子进程先于父进程退出,但是父进程没有处理子进程的退出状态,此时子进程就会成为僵尸进程
- 僵尸进程会存留少量的PCB信息在内存中,大量的僵尸进程会消耗系统资源,应该避免僵尸进程的产生
- 如何避免僵尸进程的产生
- 处理子进程的退出状态
pid, status = os.wait()
功能:在父进程中堵塞等待子进程退出
返回值:
- 处理子进程的退出状态
要求理解
- 什么是进程?
- 了解进程特征
- 清楚进程每种状态,
多进程编程
1
2
3 # 功能 : 创建新的进程
4 # 参数 : 无
5 # 返回值 : 失败--->返回一个负数
6 # 成功--->在新的进程中返回 ‘0’
7 # 在原有的进程中返回新的进程的PID号
8 import os
9 pid = os.fork()
10 pid = os.fork()
11 print(pid)
12
13 '''打印
14 9352
15 0
16 9353
17 '''
- 子进程会复制父进程的全部代码,包括fork之前产生的内存空间
- 子进程从fork的下一句开始执行,与父进程互不干扰
- 父子进程的执行顺序不一定,父子进程公用一个终端显示
- 父子进程通常会根据fork返回值的差异选择执行不同的代码。所以if结构几乎是fork的标配
- 父子进程空间独立,操作都是本空间的内容,互不影响
- 子进程也有自己的特性,比如PID号,PCB,命令集等。
进程相关的函数
函数方法 | 参数 | 说明 |
---|---|---|
os.getpid() | 返回当前进程的PID号 | |
os.getppid() | 返回当前进程的父进程的PID号 | |
os._exit( status ) | 程序的退出状态 | 进程退出 |
sys.exit( [ status ] ) | 数字:表示退出状态,不写默认为 | 进程退出 |
import os
pid = os.fork()
if pid < 0:
print("创建进程失败")
elif pid == 0:
print("子进程我的真实PID为:",os.getpid(),"我的父进程PID为:",os.getppid())
else:
print("我是父进程执行的代码,当前的变量pid为:",pid,"我的真实PID为:",os.getpid())
'''打印内容''''''
我是父进程执行的代码,当前的变量pid为: 10992 我的真实PID为: 10991
子进程我的真实PID为: 10992 我的父进程PID为: 10991
''''''打印内容'''
# 如果pid 10992和子进程真实PID不同,那么这个子进程就变成了孤儿进程
多进程模块
import multiprocessing
from multiprocessing import Process
Process()
- 功能:创建一个进程对象
- 参数
- name:进程名称
- target:绑定函数
- args:元组,给target函数按照位置传参
- kwargs:字典,给target函数按照键值对传参
- name: 字符串,新的进程的名字
- 例如:p = Process(target = fun,args=(a,b))
函数方法 | 说明 |
---|---|
p.start() | 启动进程,target函数自动执行,此时进程被真正创建 |
p.join([timeout]) | 阻塞等待回收子进程,timeout为超时时间 |
p.is_alive() | 判断进程生命周期状态,处于生命周期,返回布尔值 |
p.name() | 获取进程名称 |
p.pid() | 获取进程 的pid |
p.daemon() | 默认状态False,主进程退出不影响子进程。True :子进程随着主进程结束 |
- 使用multiprocessing创建子进程,同样子进程复制父进程的全部代码,父子进程各自执行互不影响,父子进程有各自的运行空间
- 如果不使用join挥手子进程则子进程退出后会成为僵尸进程
from multiprocessing import Process
from time import sleep
#带参数的进程函数
def worker(sec,name):
for i in range(3):
sleep(sec)
print("I'm %s"%name)
print("I'm working...")
p = Process(target = worker,args = (2,),\
kwargs = {'name':'Daivl'},name = "Worker")
p.start()
print("Process name:",p.name) #进程名称
print("Process PID:",p.pid) #获取进程PID
#进程alive情况
print("Process is alive:",p.is_alive())
p.join(3)
print("==================")
创建自定义继承Process类
- 继承Process
- 编写自己的
__init__
,同时加载父类的__init__
方法 - 重写
run
方法,可以通过生成的对象调用start自动运行
from import Process
import time
class ClockProcess(Process):
def __init__(self,value):
self.value = value
super(ClockProcess,self).__init__()
def run(self):
for i in range(5):
print("现在的时间是",time.ctime())
time.sleep(self.value)
# 创建自定义进的类的对象
p =ClockProcess(2)
# 自动调用run
p.start()
p.join()
进程池技术
- 产生原因
- 如果有大量任务需要多进程完成。则需要频繁的创建删除进程,给计算机带来较多的资源消耗
- 原理
- 创建适当的进程放入进程池,用来池里待处理的时间,处理完当前任务后进程不销毁,仍然哎进程池等待处理其他时间,进程的复用降低了系统资源的消耗
- 使用方法
- 创建进程池,在池内放入适当的进程
- 将事件加入到事件等待队列
- 不断取进程执行时间,直到所有进程执行完毕
- 关闭进程池,回收进程
Pool函数
- Pool(Processes)
- 功能:创建进程池对象
- 表示进程池中有多少进程
- pool.apply_async(func, args, kwds)
- 功能:将时间放入进程池队列
- 参数
- func:事件函数
- args:以元组形式给func传参
- kwds : 以字典形式给func传参
- 返回值一个对象
- pool.apply(func, args, kwds)
- 功能:将时间放入进程池队列
- 参数
- func:事件函数
- args:以元组形式给func传参
- kwds : 以字典形式给func传参
- pool.close()
- 功能:关闭进程池
- pool.join()
- 功能:回收进程池
- pool.map(func,iter)
- 功能:将要做的事件放入进程池
- 参数
- func : 要执行的函数
- iter : 迭代对象
- 返回值:多次return的结果列
from multiprocessing import Process
import time
class ClockProcess(Process):
def __init__(self,value):
self.value = value
super(ClockProcess,self).__init__()
def run(self):
for i in range(5):
print("现在的时间是",time.ctime())
time.sleep(self.value)
# 创建自定义进的类的对象
p =ClockProcess(2)
# 自动调用run
p.start()
p.join()
进程间的通讯(IPC)
进程间通讯的方法有:管道,消息队列,共享内存,信号,信号量,套接字
管道 | 消息队列 | 共享内存 | |
---|---|---|---|
开辟空间 | 内存 | 内存 | 内存 |
读写方式 | 两端读写[双向/单向] | 先进先出 | 覆盖之前的内容 |
效率 | 一般 | 一般 | 较高 |
应用 | 多用于父子进程 | 广泛灵活 | 需要注意互斥 |
管道通讯
通信原理:在内存中开辟管道空间,生成管道操作对象,多个进程使用“同一个”管道对象进行操作,即可实现通信
multiprocessing --> Pipe
fd1,fd2 = Pipe(duplex = True)
- 功能:创建管道
- 参数
- 默认表示为双向管道
- 设置False则为单向管道
- 返回值
- 如果是双向管道则都可以读写
- 如果是单向管道,则fd1只读,fd2只写
fd.recv()
- 功能:从管道读取信息
- 返回值:读取到的内容
- 如果管道为空则堵塞
fd.send(data)
- 功能:向管道中写入内容
- 参数:要写入的内容
- 可以发送任意Python数据类型
多进程管道传输数据示例
from multiprocessing import Process,Pipe
import time,os
# 创建管道对象
fd1, fd2 = Pipe()
def fun(name):
time.sleep(1)
fd2.send(os.getppid())
jobs = []
# 创建5个子进程
for i in range(5):
p = Process(target = fun,args = (i,))
jobs.append(p)
p.start()
for i in range(5):
data = fd1.recv()
print(data)
for i in jobs:
i.join()
消息队列
队列:先进先出,按照顺序来
通信原理:在内存中建立队列数据结构模型。多个进程都可以通过队列存入内容,取出内容的顺序和存入的顺序保持一致
- 创建队列
q = Queue(maxsize = 0)
- 功能:创建队列消息
- 参数:表示最多存储多少消息。默认表示根据内存分配存储
- 返回值:队列对象
q.put(data, [block, timeout])
- 功能:像队列存储消息
- 参数
- data:要存的内容
- block:默认队列满时候会堵塞,设置False则非堵塞
- timeout:超时时间
data = q.get([block, timeout])
- 功能:获取队列消息
- 参数
- block:默认设置队列空时会堵塞,设置为False则非堵塞
- timeout:超时时间
- 返回值:返回取出的内容
q.full()
:判断队列是否为满q.empty()
:判断队列是否为空q.size()
:判断队列中的消息数量q.close
:关闭队列
单进程示例
#!《 单进程 》
from multiprocessing import Queue
from time import sleep
# 创建队列,可以放3条消息
q = Queue(3)
# 存一条消息
q.put(1)
sleep(0.5)
# 判断队列是否为空
print(q.empty())
q.put(2)
# 判断队列是否满
print(q.full())
q.put(3)
# 输出队列消息数量
print(q.qsize())
# 输出
print(q.get())
q.close()
多进程消息队列传递数据
from multiprocessing import Queue,Process
from time import sleep
# 创建队列,可以放3条消息
q = Queue(3)
def fun1():
sleep(1)
q.put({"a":1,"b":2})
def fun2():
sleep(2)
print("收到消息",q.get())
p1 = Process(target = fun1)
p2 = Process(target = fun2)
p1.start()
p2.start()
p1.join()
p2.join()
共享内存
通信原理:在内存中开辟一段空间,对多个进程可见,进程可以写入可以读,但是每次写入的内容会覆盖上一次的内容。
只能进行单个数据的发送
obj = Value(ctype, obj)
- 功能:开辟共享内存空间
- 参数
- ctype:要存储的数据类型
- obj:共享内存的初始数据
- 返回值:共享内存对象
obj.value
即共享内存值,对其修改即可修改内存-
from multiprocessing import Value from time import sleep import os # 创建共享内存对象 money = Value('i',2000) # 操作共享内存 def deposite(): while True: i = int(input("请输入:")) money.value = i sleep(0.05) def withdraw(): data = money.value while True: if data != money.value : data = money.value print(data) pid = os.fork() if pid == 0 : deposite() withdraw()
obj = Array(ctype, obj)
- 功能:开辟共享内存空间
- 参数:
- ctype:要存入的数据格式
- obj:初始化存入的内容,比如列表、字符串。如果是数字则代表开辟内存空间的个数
- 返回值:返回共享内存对象,可以遍历
-
from multiprocessing import Array,Process from time import sleep import os # 开辟100字符内存空间,'c'代表字符,'i'代表整形 shm = Array('c',100) # 必须使用字节流 shm.value = "哈哈哈".encode() def fun1(): print(os.getpid(),"子进程1:",shm.value.decode()) shm.value = "夜夜夜".encode() def fun2(): sleep(1) print(os.getpid(), "子进程2:",shm.value.decode()) p1 = Process(target = fun1) p2 = Process(target = fun2) p1.start() p2.start() p1.join() p2.join()
信号通信
一个进程向另一个进程发送一个信号来传递某种信息,接受者根据传递的信息来做相应的事
$ kill -l
查看系统信号说明
$ kill -9 pid号
对进程发送信号
信号名称 | 说明 | ||
---|---|---|---|
1) SIGHUP | 连接断开 | ||
2) SIGINT | ctrl+c | ||
3) SIGQUIT | ctrl+\ | ||
20) SIGTSTP | ctrl+z | ||
9) SIGKILL | 终止进程 | ||
19) SIGSTOP | 暂停进程 | ||
26) SIGVTALRM | 时钟信号 | ||
17) SIGCHLD | 子进程退出时给父进程发的信号 | ||
在Python中import signal
可以获取信号
os.kill(pid, sig)
- 功能:发送信号
- 参数
- pid:要发送信号的PID号
- sig :信号名称
import os
import signal
os.kill(12345,signal.SIGKILL) #杀死进程
THE END