网络编程并发 多进程 进程池,互斥锁,信号量,IO模型

2023-03-16,,

进程:程序正在执行的过程,就是一个正在执行的任务,而负责执行任务的就是cpu

操作系统:操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序。

操作系统的作用:

1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口

2:管理、调度进程,并且将多个进程对硬件的竞争变得有序。

多道技术产生的背景:针对单核,实现并发

多道技术:多道技术中的多道指的是多个程序,多道技术的实现是为了解决多个程序竞争或者说共享同一个资源(比如cpu)的有序调度问题,解决方式即多路复用,多路复用分为时间上的复用和空间上的复用。,

时间复用:当一个程序在等待I/O时,另一个程序可以使用cpu。提高CPU的利用率。

空间复用:同一时间内存可以缓存更多的程序。

进程

创建进程有方法

方法一:利用subprocess 模块,具体方法见网络编程模块。subprocess模块有很大的局限性。) 我们总是让subprocess运行外部的程序,而不是运行一个Python脚本内部编写的函数。2) 进程间只通过管道进行文本交流。以上限制了我们将subprocess包应用到更广泛的多进程任务。

方法二:利用multiprocessing模块,这个模块叫多进行模块管理包,multiprocessing中的Process中的类可以创建一个子进程。

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
一般需要指定的参数为target,表示为一个函数,这个函数必须是可调用的,
args表示为target的参数,是一个元组的格式。注意在只有一个参数的时候,必须写上逗号,表示为一个tuple
kwargs表示为target的参数,是一个字典的格式。

注意:在Windows系统中开子进程的代码必须要写在if __name__="__main__" 下。mac笔记本不需要因为这两种系统开启子进程的方式不一样

范例

from multiprocessing import Process
import time
def task(name):
print("%s is runing"%name)
time.sleep(2)
print("%s is done"%name)
if __name__=="__main__":
p=Process(target=task,args=("egon",))
p.start()#只是给操作系统发了个信号,让操作系统开进程(同时开辟一块内存空间,拷贝父进程的地址空间)
print("你好,中国")

结果:

你好,中国
egon is runing
egon is done

子进程回收机制:进程由谁开,由谁回收,等子程序完成后,才由父进程回收

p.start()#只是给操作系统发了个信号,让操作系统开一个进程(同时开辟一块内存空间,拷贝父进程的地址空间),同时我们还要知道start方法是一个异步非阻塞,因为当执行start方法的时候,并不会阻塞当前进程去等待子进程执行完,
而是会继续执行主进程的代码 p.join() :阻塞当前进程,直到调用join方法的那个进程执行完,再继续执行当前进程。也就是说主进程等子进程完了,自己再进行进程。join()方法是一个同步阻塞 p.pid 子进程id p.terminate() 强制结束进程
p..is_alive() 查看该进程是否活着

python代码由python解释器解释运行,你启用的python进程其实是运用了python exe的进行。

os .getppid()查看父进程的ID

os.getpid()获取当前的进程的ID。

方法三:

from multiprocessing import Process

class MyProcess(Process):
def __init__(self, n):
self.n = n
super(MyProcess, self).__init__() # 使用父类的方法才能传参 def run(self):
"""一定要有这个函数"""
print(self.n)
print(os.getppid(), os.getpid()) print("主进程", os.getppid(), os.getpid())
my = MyProcess("小红")
my.start()

结果:

主进程 241 6452
小红
6452 6454

进程必须要知道的事情

 1.进程之间内存相互隔离

举例

from multiprocessing import Process
n=1
def f():
global n
n=13
if __name__ == '__main__':
p =Process(target=f)
p.start()
p.join()
print(n)

结果:

 1

结论:子进程不能修改主进程的变量,用global的作用就是看是否会修改全局变量,因为在单进程中可以修改。

2.子进程用主进程的值来作为自己的初始值。

n=1
def f():
print(n)
if __name__ == '__main__':
p =Process(target=f)
p.start()

结果:

1
1

3.不能获得子进程的返回值,因为进程之间是隔离的 

创建并发进程服务器

进程之间的通信(IPC)

由于我们知道进程之间内存是隔离的,但是可以通过别的方法实现进程间的通信

大体有两个方面:

    基于文件 适合于同一台机器上多进程进行通信.基于socket的文件级别的通信
    基于网络 消息中间件(redis,rabbitmq,memcach)

以下是基于文件的实现的进程之间的通信
1.消息队列 from multiprocessing import Queue ,一个进程向队列里放东西,另一个进程向队列中拿东西
  特点:

    数据安全.因为管道加锁
    可以实现进程之间的通信
    先进先出

实现原理:管道+加锁

2.管道是一种半双工的通信方式,管道是指用于连接一个读进程和一个写进程以实现她们之间通信的一个文件。写进程将数据以字符流的形式放入管道,读进程则从管道中接收数据。要实现双向通信必须要创建两个管道.数据不安全,需要加锁
 

第一: 消息队列

在进程中引用队列的方式:

from multiprocessing import Queue

在线程中引用队列的方式:

import queue

举例子:

from multiprocessing import Process,Queue
import os,time,random
def write(q):
"""
一个进程往队列中放东西
:param q:
:return:
"""
for value in ["A","B","C"]:
print('put %s to queue...'%value)
q.put(value)#往里面放东西
time.sleep(random.random())
def read(q):
"""
另一个进程从队列中拿数据
:param q:
:return:
"""
while True:
if not q.empty():
value=q.get(True)#从里面拿东西
print('GET %s from queue'%value)
time.sleep(random.random())
else:
break
if __name__ == '__main__':
q=Queue()#可以不设置大小,受限于内存的大小
pw=Process(target=write,args=(q,))
pr=Process(target=read,args=(q,))
pw.start()
pw.join()
pr.start()
pr.join()
print('所有的数据都拿完了')

结果:

put A to queue...
put B to queue...
put C to queue...
GET A from queue
GET B from queue
GET C from queue
所有的数据都完了

2管道

import multiprocessing

def processFun(conn, name):
print(multiprocessing.current_process().pid, "进程发送数据:", name)
conn.send(name) if __name__ == '__main__':
# 创建一个管道,一个管道有两个口,一个发一个收
conn1, conn2 = multiprocessing.Pipe()
# 创建子进程
process = multiprocessing.Process(target=processFun, args=(conn1, "http://c.biancheng.net/python/"))
# 启动子进程
process.start()
process.join()
print(multiprocessing.current_process().pid, "接收数据:")
print(conn2.recv())

结果:

7840 进程发送数据: http://c.biancheng.net/python/
7839 接收数据:
http://c.biancheng.net/python/

详情见:

https://www.cnblogs.com/Nicholas0707/p/10787945.html

进程间数据共享

共享内存,正常情况下,每个进程都拥有自己的内存空间,因此进程间的内存是无法共享的它。在内存中划出了一块共享内存取,进程可以通过对该共享区的读或写交换信息,实现通信
允许多个进程访问相同的内存,一个进程改变其中的数据后,其他的进程都可以看到数据的变化.python中提供了两种方法,也就是multiprocessiing中的两个类 array和Manger
代价就是运行时间慢,数据不安全,需要加锁
间博客https://www.cnblogs.com/Eva-J/p/5110844.html

第二种共享内存

from multiprocessing import Manager, Lock, Process

def change_dict(dic, lock):
with lock: # 加锁,时间慢
dic["count"] -= 1 if __name__ == '__main__':
m = Manager()
lock = Lock()
dic = m.dict({"count": 100})
p_l = []
for i in range(100):
p = Process(target=change_dict, args=(dic, lock))
p.start()
p_l.append(p) for k in p_l: k.join() #为了使所有的进程运行完
print(dic['count'])

结果:

 0

守护进程

见:https://www.cnblogs.com/sticker0726/p/7943412.html

进程池

开多进程是为了并发,通常有几个cpu核心就开几个进程,但是进程开多了会影响效率,主要体现在切换的开销,所以引入进程池限制同时开进程的数量。

注意:比如说你有10个任务,pool(4)表示:我只开4个进程,10个任务由这4个进程执行,而不是开10个不同的进程,这点要注意

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

有两个可以实现进程池

1.from multiprocessing import Pool     这个不推荐用,不好用,太麻烦了,在这里只作为了解

2.from concurrent.futures import ProcessPoolExecutor   推荐使用这个

from multiprocessing import Pool
import time,os,random def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__ == '__main__':
print('parent process %s...'%os.getpid())
p=Pool(4) #Pool的默认大小时CPU的核数
for i in range(12):
p.apply_async(long_time_task,args=(i,))
print('等待所有子进程运行')
p.close() #线程池用完后别忘记关闭
p.join()
print('所有的进程都已经运行完毕')

结果:

parent process 11448...
等待所有子进程运行
Run task 0 (6368)...
Run task 1 (7564)...
Run task 2 (10244)...
Run task 3 (18544)...
Task 2 runs 0.19 seconds.
Run task 4 (10244)...
Task 1 runs 0.66 seconds.
Run task 5 (7564)...
Task 0 runs 0.82 seconds.
Run task 6 (6368)...
Task 4 runs 1.48 seconds.
Run task 7 (10244)...
Task 3 runs 2.37 seconds.
Run task 8 (18544)...
Task 6 runs 1.98 seconds.
Run task 9 (6368)...
Task 5 runs 2.24 seconds.
Run task 10 (7564)...
Task 9 runs 0.93 seconds.
Run task 11 (6368)...
Task 7 runs 2.45 seconds.
Task 10 runs 2.05 seconds.
Task 11 runs 1.28 seconds.
Task 8 runs 2.68 seconds.
所有的进程都已经运行完毕

concurrent.futures模块

concurrent.futures模块是python3提供的一个异步并发模块,主要用来实现多进程和多线程的异步并发.

提交内容的方式有两种:

同步调用:提交/调用一个任务,然后就在原地等着,等到该任务执行完毕拿到返回结果,再执行下一步代码。老版本才有,一般不用。同步提交任务会导致程序串行,也就是开多线程就没有意义了.

异步调用:提交/调用一个任务,不在原地等着前一个任务执行完,就直接执行下一个任务。可以实现并发的效果。其实异步的前提是两个任务没有关系,下一个任务不需要上一个任务的结果,就可以用异步。

同步调用范例:

from concurrent.futures import ProcessPoolExecutor
import time,random,os def piao(i):
print('线程%s is 正在运行'%os.getpid())
time.sleep(random.randint(1,4))
return i
if __name__ == '__main__':
p=ProcessPoolExecutor(4) #创建进程池
for i in range(10):
a=p.submit(piao,i) #把进程放到进程池,这是同步提交任务的方式,即拿到结果后才执行下一行代码
print('任务结果为:%s'%(a.result())) #获取返回值的方式p.result
p.shutdown() #等所有的进程运行完了再执行主进程相当于join(),在这期间不能再添加新进程
print('主进程%s'%os.getpid())

结果体现了同步提交的方式,你可以发现他是顺序执行的

进程13128 is 正在运行
任务结果为:0
进程6008 is 正在运行
任务结果为:1
进程5108 is 正在运行
任务结果为:2
进程8280 is 正在运行
任务结果为:3
进程13128 is 正在运行
任务结果为:4
进程6008 is 正在运行
任务结果为:5
进程5108 is 正在运行
任务结果为:6
进程8280 is 正在运行
任务结果为:7
进程13128 is 正在运行
任务结果为:8
进程6008 is 正在运行
任务结果为:9
主进程11900

结果

异步调用范例;

from concurrent.futures import ProcessPoolExecutor
import time,random,os def piao(i):
print('线程%s is 正在运行'%os.getpid())
time.sleep(random.randint(1,4))
return i
if __name__ == '__main__':
p=ProcessPoolExecutor(4) #创建进程池
objs=[]
for i in range(10):
print('该进程是>>>%s' % (os.getpid()))
a=p.submit(piao,i) #把进程放到进程池,这是同步提交任务的方式,即拿到结果后才执行下一行代码
objs.append(a)
for obj in objs:
print('该进程是----------%s' % (os.getpid()))
print('任务结果为:%s' % (obj.result())) # 获取返回值的方式p.result
p.shutdown() #等所有的进程运行完了再执行主进程相当于join(),在这期间不能再添加新进程
print('主进程%s'%os.getpid())

结果:

该进程是----------18908
进程14424 is 正在运行
进程2612 is 正在运行
进程5076 is 正在运行
进程17496 is 正在运行
进程5076 is 正在运行
进程2612 is 正在运行
进程17496 is 正在运行
进程14424 is 正在运行
任务结果为:0
该进程是----------18908
任务结果为:1
该进程是----------18908
任务结果为:2
该进程是----------18908
任务结果为:3
该进程是----------18908
任务结果为:4
进程5076 is 正在运行
该进程是----------18908 ####这里和主进程的id一样
进程17496 is 正在运行
任务结果为:5
该进程是----------18908
任务结果为:6
该进程是----------18908
任务结果为:7
该进程是----------18908
任务结果为:8
该进程是----------18908
任务结果为:9
主进程18908

研究了半天终于明白了, 这个异步,其实把取结果的这步放在了主进程中,这样看起来就是异步调用了

异步提交的回调机制

异步调用真正的概念:  提交完任务(为该任务绑定一个回调函数)后,不用在原地等待该任务执行完拿到结果,就直接提交下一个任务.一个任务一旦执行完任务就会自动调用回调函数.

我们把上边异步的例子修改一下,把取结果这一步,写成一个函数get_value,当一个进程得到结果后会自动调用add_done_calkback()函数用来通知get_value函数让它取对象

from concurrent.futures import ProcessPoolExecutor
import time,random,os def piao(i):
print('进程%s is 正在运行'%os.getpid())
time.sleep(random.randint(1,4))
return i
def get_value(obj):
  #这里的obj就等于p.submit(piao,i)的返回值
print('任务结果为:%s' % (obj.result())) if __name__ == '__main__':
p=ProcessPoolExecutor(4) #创建进程池
for i in range(10):
p.submit(piao,i).add_done_callback(get_value) #当该进程有结果了,就会自动调用回调函数,回调函数会通知get_value函数,让它取结果,注意被通知的函数的参数就是回调函数前边的结果.
p.shutdown() #等所有的进程运行完了再执行主进程相当于join(),在这期间不能再添加新进程
print('主进程%s'%os.getpid())

结果:

进程9904 is 正在运行
进程14028 is 正在运行
进程19360 is 正在运行
进程7572 is 正在运行
进程14028 is 正在运行
任务结果为:1
进程7572 is 正在运行
任务结果为:3
进程19360 is 正在运行
任务结果为:2
进程9904 is 正在运行
任务结果为:0
进程14028 is 正在运行
任务结果为:4
进程7572 is 正在运行
任务结果为:5
任务结果为:7
任务结果为:8
任务结果为:6
任务结果为:9
主进程11352

互斥锁

临界资源:一次仅允许一个进程使用的资源,比如打印机

互斥:并发执行的多个进程由于竞争同一资源(临界资源)而产生的互相排斥的现象,

为什么要加这把锁:

当并发执行的进程取修改某一项任务时,其中一个线程执行时,遇到io切换到另外的近程,

互斥锁的特点:

1.每个线程或进程在不释放锁的情况下只能accquire()一次锁,如果想要再次accqure就会卡在原地
2.互斥锁把并发变为串行,降低程序运行的速度

如下代码:

买火车票的例子

注意在多进程中不要使用全局变量,因为进程之前的内容不共享,可以使用json文件

json文件

{"count": 1}
from multiprocessing import Process, Lock
import os, time, random def buy_ticket(): with open('ticket_count.json', 'r') as f:
ticket_count = json.load(f)
if ticket_count["count"] > 0:
print('进程%s 买到票了 ' % os.getpid())
ticket_count['count'] -= 1
time.sleep(random.randint(1, 2)) # 出现io切换到下一个近程中,但是这个近程还没有打印完,其它线程开始执行,这样所有的数据都会乱了.
else:
print('进程%s 没有票了' % os.getpid())
with open('ticket_count.json', 'w') as f:
json.dump(ticket_count, f) def read_ticket():
"""
查票
:return:
"""
with open('ticket_count.json', 'r') as f:
ticket_count = json.load(f)
print("余票还剩%s" % ticket_count["count"]) def check_and_buy_ticket():
read_ticket() buy_ticket() if __name__ == '__main__':
for i in range(3):
t1 = Process(target=check_and_buy_ticket)
t1.start()

结果:

进程6664 买票开始
进程6665 买票开始
主进程6661
进程6666 买票开始
进程6665 买到票了
进程6666 买到票了
进程6664 买到票了

命名只有一张票,为啥3人都买到了,这就是数据不安全想象

解决方法

加互斥锁

注意以上方法虽然解决了数据不安全的缺点,但是运行效率变慢了,因为并发变成了串行

仅演示互斥锁额例子

加互斥锁的例子:

from multiprocessing import Process, Lock
import os, time, random def buy_ticket():
time.sleep(random.randint(1, 2)) # 出现io切换到下一个近程中,但是这个近程还没有打印完,其它线程开始执行,这样所有的数据都会乱了.
with open('ticket_count.json', 'r') as f:
ticket_count = json.load(f)
if ticket_count["count"] > 0:
print('进程%s 买到票了 ' % os.getpid())
ticket_count['count'] -= 1
else:
print('进程%s 没有票了' % os.getpid())
with open('ticket_count.json', 'w') as f:
json.dump(ticket_count, f) def read_ticket():
"""
查票
:return:
"""
with open('ticket_count.json', 'r') as f:
ticket_count = json.load(f)
print("余票还剩%s" % ticket_count["count"]) def check_and_buy_ticket(lock):
read_ticket() # lock.acquire() # 加锁 阻塞
# buy_ticket()
# lock.release() # 释放锁
with lock: # with lock 能代替acquire()release()和自动处理
buy_ticket() if __name__ == '__main__':
mutex = Lock()
for i in range(3):
t1 = Process(target=check_and_buy_ticket, args=(mutex,))
t1.start()

结果:

余票还剩1
余票还剩1
余票还剩1
进程7144 买到票了
进程7145 没有票了
进程7146 没有票了

那么互斥锁和join有什么区别呢?

join是真正的串行,必须等第一个任务执行完,第二个任务才能start()

而互斥锁可以让局部代码(指的是修改共享数据的代码)串行起来

下面我们来举例抢票的例子

使用join

from multiprocessing import Process,Lock
import time ,os ,random,json def search():
"""
查询剩余票的函数
:return:
"""
with open('ticket.txt',encoding='utf-8') as f:
dic=json.load(f)
print('进程%s 剩余票数%s'%(os.getpid(),dic['count'])) def get():
"""
抢票的函数
:return:
"""
with open('ticket.txt',encoding='utf-8') as read_f:
dic=json.load(read_f)
if dic['count']>0:
dic['count'] -= 1
time.sleep(random.randint(1,2))#模拟手速
with open('ticket.txt','w',encoding='utf-8') as write_f:
json.dump(dic,write_f)
print('%s购买成功'%(os.getpid()))
else:
print('%s失败' % (os.getpid())) def task():
search()
get() if __name__ == '__main__':
for i in range(20):
p=Process(target=task)
p.start()
p.join()

使用join

结果:

进程19244 剩余票数1
19244购买成功
进程14476 剩余票数0
14476失败
进程17520 剩余票数0
17520失败
进程18664 剩余票数0
18664失败
进程18164 剩余票数0
18164失败
进程18348 剩余票数0
18348失败
进程2232 剩余票数0
2232失败
进程15708 剩余票数0
15708失败
进程11236 剩余票数0
11236失败
进程11488 剩余票数0
11488失败
进程10448 剩余票数0
10448失败
进程3956 剩余票数0
3956失败
进程964 剩余票数0
964失败
进程11696 剩余票数0
11696失败
进程12732 剩余票数0
12732失败
进程5920 剩余票数0
5920失败
进程13520 剩余票数0
13520失败
进程16968 剩余票数0
16968失败
进程13592 剩余票数0
13592失败
进程18344 剩余票数0
18344失败

问题:只要第一个查看余票不为0,就一定能抢到票,但是假设这个人查看月后,半天都没有付款,怎么?所有人都要等他.

使用互斥锁

from multiprocessing import Process,Lock
import time ,os ,random,json def search():
"""
查询剩余票的函数
:return:
"""
with open('ticket.txt',encoding='utf-8') as f:
dic=json.load(f)
print('进程%s 剩余票数%s'%(os.getpid(),dic['count'])) def get():
"""
抢票的函数
:return:
"""
with open('ticket.txt',encoding='utf-8') as read_f:
dic=json.load(read_f)
if dic['count']>0: dic['count'] -= 1
time.sleep(random.randint(0,3))#模拟手速
with open('ticket.txt','w',encoding='utf-8') as write_f:
json.dump(dic,write_f)
print('%s购买成功'%(os.getpid())) else:
print('%s失败' % (os.getpid())) def task(mutex):
search()
mutex.acquire() # 加锁的意思
get()
mutex.release() # 释放的意思 if __name__ == '__main__':
mutex=Lock()
for i in range(20):
p=Process(target=task,args=(mutex,))
p.start()

使用互斥锁

结果:

进程10088 剩余票数1
进程16132 剩余票数1
10088购买成功
16132失败
进程884 剩余票数0
884失败
进程7940 剩余票数0
7940失败
进程14548 剩余票数0
14548失败
进程16104 剩余票数0
16104失败
进程12956 剩余票数0
12956失败
进程10264 剩余票数0
10264失败
进程19972 剩余票数0
19972失败
进程10740 剩余票数0
10740失败
进程14028 剩余票数0
14028失败
进程2392 剩余票数0
2392失败
进程11732 剩余票数0
11732失败
进程3468 剩余票数0
3468失败
进程11192 剩余票数0
11192失败
进程15936 剩余票数0
15936失败
进程5712 剩余票数0
5712失败
进程19732 剩余票数0
19732失败
进程9864 剩余票数0
9864失败
进程7316 剩余票数0
7316失败

互斥锁关于线程

我们在双十一的时候,要并发的购买同一商品时,如果不加锁

代码如下:

from threading import Thread ,Lock

import time,random

n = 100 #商品的总数为100件
def task():
global n
temp=n
time.sleep(0.1)
n=temp-1 if __name__ == '__main__':
objs=[]
for i in range(100):
t=Thread(target=task)
objs.append(t)
t.start()
for obj in objs: #添加这步的目的是为了让上边的子线程执行完,再执行主线程
obj.join() print(n)

结果:

99

应该是多少呢?应该为0,这就发生了数据安全的问题了

所以我们这时候要加互斥锁

from threading import Thread ,Lock #线程的互斥锁导入的方式不一样
import time,random n = 100 #商品的总数为100件
def task():
global n
with mutex: #这里是锁的简化写法和with open一样
temp=n
time.sleep(0.1)
n=temp-1 if __name__ == '__main__':
mutex=Lock() #这里不用给每个线程传mutex了,因为线程之间是共享内存的
objs=[]
for i in range(100):
t=Thread(target=task)
objs.append(t)
t.start()
for obj in objs: #添加这步的目的是为了让上边的子线程执行完,再执行主线程
obj.join() print(n)

结果为:

 0

互斥锁的简单写法

 with mutex: #这里是锁的简化写法和with open一样
temp=n
time.sleep(0.1)
n=temp-1

相当于:

    mutex.acquire()
temp = n
time.sleep(0.1)
n = temp - 1
mutex.release()

信号量(semaphore)

信号量:关于信号量操作系统中是这样解释的:信号量是一种数据结构,信号量的值与相应的资源的使用情况有关.

  理解的白话文这样解释:

    信号量是一个变量,控制着对公共资源或者临界区的访问。信号量维护着一个计数器,指定可同时访问资源或者进入临界区的线程数。

     每次有一个线程获得信号量时,计数器-1。若计数器为0,其他线程就停止访问信号量,直到另一个线程释放信号量

你可能会有疑问这不和进程池和线程池的概念一样吗? 有很大不一样

和进程池的区别

进程池:pool(4):代表最大只允许有4个进程同时运行. 由这4个进程把所有的任务都完成.

信号量: semaphore(4) 代表最大只允许四个进程同时访问该4资源,但是可以允许存在多个进程

总的来说:进程池控制开启进程的数量,而信号量控制进程访问某一资源的个数.

代码实现

信号量:

from multiprocessing import Process,Semaphore
import os, random,time
#from threading import Thread,Semaphore 线程使用时信号量时 def task(sem):
with sem:
print("线程%s正在上厕所" % os.getpid())
time.sleep(random.randint(0, 2))
print("线程%s上完厕所了" % os.getpid()) if __name__ == '__main__':
sem=Semaphore(4)#同一时间只允许四个进程访问该资源
for i in range(10):
p=Process(target=task,args=(sem,))
p.start()

结果:可以看到开启了10个不同的进程

线程6104正在上厕所
线程10908正在上厕所
线程6104上完厕所了
线程10908上完厕所了
线程11220正在上厕所
线程14176正在上厕所
线程14176上完厕所了
线程7152正在上厕所
线程7152上完厕所了
线程1520正在上厕所
线程20264正在上厕所
线程16440正在上厕所
线程20264上完厕所了
线程6444正在上厕所
线程11220上完厕所了
线程14668正在上厕所
线程6444上完厕所了
线程1520上完厕所了
线程16440上完厕所了
线程14668上完厕所了

进程池的代码:

from concurrent.futures import ProcessPoolExecutor
import os, random,time def task2():
print("线程%s正在上厕所" % os.getpid())
time.sleep(random.randint(0, 2))
print("线程%s上完厕所了" % os.getpid()) if __name__ == '__main__':
p=ProcessPoolExecutor()
for i in range(10):
a=p.submit(task2)

结果:

进程8156正在上厕所
进程4912正在上厕所
进程1384正在上厕所
进程14264正在上厕所
进程14264上完厕所了
进程14264正在上厕所
进程8156上完厕所了
进程8156正在上厕所
进程8156上完厕所了
进程8156正在上厕所
进程8156上完厕所了
进程8156正在上厕所
进程4912上完厕所了
进程4912正在上厕所
进程1384上完厕所了
进程1384正在上厕所
进程1384上完厕所了
进程14264上完厕所了
进程8156上完厕所了
进程4912上完厕所了

从结果上看出:只开了4个进程: 4912,8156,14264,1384

死锁现象

死锁现象也叫顶牛:各个并发线程彼此等待对方拥有的资源,且在得到对方资源前不释放自己占有的资源,就会发生死锁现象

出现原因:

1.有多个线程

2.每个线程中交替使用多把锁

from threading import Thread ,Lock,current_thread

import time , random

mutexA=Lock()
mutexB=Lock()
def f1():
with mutexA:
print('%s在f1拿到了A锁'%current_thread().getName())
with mutexB:
print('%s在f1拿到了B锁' % current_thread().getName()) def f2():
with mutexB:
print('%s在f2中拿到了B锁' % current_thread().getName())
time.sleep(0.1)#模拟线程阻塞
with mutexA:
print('%s在f2拿到了A锁' % current_thread().getName()) def task():
f1()
f2()
if __name__ == '__main__':
for i in range(10):
t=Thread(target=task)
t.start()

结果:

Thread-1在f1拿到了A锁
Thread-1在f1拿到了B锁
Thread-1在f2中拿到了B锁
Thread-2在f1拿到了A锁

线程1在f2中拿到了B锁,想要申请A锁,这时候A锁被线程2在f1中占有它想申请B锁,这时线程1不释放B锁,线程2不释放A锁,就会发生死锁现象.

这时候,我们会想既然用两把锁会发生死锁,那么干脆用1把锁不就行了

mutexA=mutexB=Lock()

别忘了互斥锁的特点每个线程只能再不释放锁的情况下只能accquire()一次,如果想要再次accqure就会发生下面的现象

Thread-1在f1拿到了A锁
....

线程1卡在这里不动了,这是因为一个线程只能获得一次锁,不能重复获得。要解决这个问题只要能让一把锁获得多次不就行了。

这时候来解决死锁的锁来了叫递归锁

递归锁

出现背景:

  1.在互斥锁中一个线程在不释放锁的情况下,无法获得另一把锁

  2.解决死锁现象

我们分析了死锁现象,那么在python中是如何解决这个死锁现象的呢?

,python提供了可重入锁RLock,也叫递归锁

递归锁特点:

    该线程可以多次获得(accquire)这把锁。
    直到该线程所有的锁都被释放了,别的线程才可以抢这把锁。
    速度慢于互斥锁.

递归锁本质:把多把锁变成了一把锁

例子:假设该线程中有两把锁.A,B,只要该线程获得A锁,就相当于获得了AB锁,直到该线程所有的锁都被释放了,别的线程才可以抢AB锁

原理:递归锁内部维护了Lock和counter变量,counter记录了accquire()的次数,当线程释放一个lock。counter就减去1,直到counter=0,也就是该线程所有的lock都被释放,别的线程才可以抢这个锁。

from threading import Thread ,Lock,current_thread,RLock

import time , random

mutexA=mutexB=RLock()

def f1():
with mutexA:
print('%s在f1拿到了A锁'%current_thread().getName())
with mutexB:
print('%s在f1拿到了B锁' % current_thread().getName())
print('%s在f1释放了B锁' % current_thread().getName())
print('%s在f1拿释放了A锁' % current_thread().getName()) def f2():
with mutexB:
print('%s在f2中拿到了B锁' % current_thread().getName())
time.sleep(0.1)
with mutexA:
print('%s在f2拿到了A锁' % current_thread().getName())
print('%s在f2释放了了A锁' % current_thread().getName())
print('%s在f2拿释放了A锁' % current_thread().getName()) def task():
f1()
f2()
if __name__ == '__main__':
for i in range(10):
t=Thread(target=task)
t.start()

结果:

Thread-1在f1拿到了A锁
Thread-1在f1拿到了B锁
Thread-1在f1释放了B锁
Thread-1在f1拿释放了A锁
Thread-1在f2中拿到了B锁
Thread-1在f2拿到了A锁
Thread-1在f2释放了了A锁
Thread-1在f2拿释放了A锁
Thread-2在f1拿到了A锁
Thread-2在f1拿到了B锁
Thread-2在f1释放了B锁
Thread-2在f1拿释放了A锁
Thread-2在f2中拿到了B锁
Thread-2在f2拿到了A锁
Thread-2在f2释放了了A锁
Thread-2在f2拿释放了A锁
Thread-4在f1拿到了A锁
Thread-4在f1拿到了B锁
Thread-4在f1释放了B锁
Thread-4在f1拿释放了A锁
Thread-4在f2中拿到了B锁
Thread-4在f2拿到了A锁
Thread-4在f2释放了了A锁
Thread-4在f2拿释放了A锁
Thread-6在f1拿到了A锁
Thread-6在f1拿到了B锁
Thread-6在f1释放了B锁
Thread-6在f1拿释放了A锁
Thread-6在f2中拿到了B锁
Thread-6在f2拿到了A锁
Thread-6在f2释放了了A锁
Thread-6在f2拿释放了A锁
Thread-8在f1拿到了A锁
Thread-8在f1拿到了B锁
Thread-8在f1释放了B锁
Thread-8在f1拿释放了A锁
Thread-8在f2中拿到了B锁
Thread-8在f2拿到了A锁
Thread-8在f2释放了了A锁
Thread-8在f2拿释放了A锁
Thread-10在f1拿到了A锁
Thread-10在f1拿到了B锁
Thread-10在f1释放了B锁
Thread-10在f1拿释放了A锁
Thread-10在f2中拿到了B锁
Thread-10在f2拿到了A锁
Thread-10在f2释放了了A锁
Thread-10在f2拿释放了A锁
Thread-5在f1拿到了A锁
Thread-5在f1拿到了B锁
Thread-5在f1释放了B锁
Thread-5在f1拿释放了A锁
Thread-5在f2中拿到了B锁
Thread-5在f2拿到了A锁
Thread-5在f2释放了了A锁
Thread-5在f2拿释放了A锁
Thread-9在f1拿到了A锁
Thread-9在f1拿到了B锁
Thread-9在f1释放了B锁
Thread-9在f1拿释放了A锁
Thread-9在f2中拿到了B锁
Thread-9在f2拿到了A锁
Thread-9在f2释放了了A锁
Thread-9在f2拿释放了A锁
Thread-7在f1拿到了A锁
Thread-7在f1拿到了B锁
Thread-7在f1释放了B锁
Thread-7在f1拿释放了A锁
Thread-7在f2中拿到了B锁
Thread-7在f2拿到了A锁
Thread-7在f2释放了了A锁
Thread-7在f2拿释放了A锁
Thread-3在f1拿到了A锁
Thread-3在f1拿到了B锁
Thread-3在f1释放了B锁
Thread-3在f1拿释放了A锁
Thread-3在f2中拿到了B锁
Thread-3在f2拿到了A锁
Thread-3在f2释放了了A锁
Thread-3在f2拿释放了A锁

我们分析下:

线程1在f1中获得了AB锁并释放后,会出现两种情况,进程中的其他线程会抢到这把递归锁来执行f1,第二种情况是:线程1继续抢到锁来执行f2.

IO模型

在学io模型之前我们要清楚同步和异步,阻塞和非阻塞的概念

阻塞和非阻塞指的是 当前线程或进程是不是被阻塞住了,

异步和同步:指的是任务调用需不需要等待前一个任务完成。

IO模型是为了能实现和gevent模块遇到io就切换的功能。

io模型的种类

    阻塞io模型
    非阻塞io模型
    io多路复用模型
    异步io模型
    信号驱动io(不常用,了解)

再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

#1)等待数据准备 (Waiting for the data to be ready)
#2)数据来了后,将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

阻塞io模型

阻塞io的特点:一旦阻塞了,任务就会卡在原地,比如我们写的服务端: 程序会在accept 和recv  这两步卡住,程序就会不动知道等到不阻塞了,才继续执行

 所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

代码:

import socket

server=socket.socket()

server.bind(('127.0.0.1',8081))
server.listen(5)
print('start....')
while True:
conn, addr = server.accept()
while True:
try:
date=conn.recv(1024)
if not date: break #这步是给linux准备的
conn.send(date.upper())
except Exception as e:
break
conn.close()
server.close()

服务端

import  socket

client=socket.socket()
client.connect(('127.0.0.1',8081)) while True:
date = input('>>>>')
client.send(date.encode())
data = client.recv(1024)
print('>>>', data.decode()) client.close()

客户端

一个简单的解决方案:

#在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接

该方案的问题是:

#开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

改进方案:

#很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,
尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

改进后方案其实也存在着问题:

#“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。
所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

  对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

非阻塞io模型

非阻塞io: 当我们提交任务后,系统会自动返回结果,到底有没有结果,如果没有结果,线程可以去干别的事情,每过一段时间在来询问操作系统有没有结果.

注意:非阻塞指的是io操作中的第一阶段即wait数据阶段不堵塞了, copy数据阶段还是阻塞状态

在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

非阻塞io实现单线程下的并发

import socket

server=socket.socket()

server.bind(('127.0.0.1',8081))
server.listen(5)
print('start....')
server.setblocking(False) #默认是True, False变为非阻塞了
conn_list=[]
del_conn=[]
while True:
try:
print(conn_list)
conn,addr=server.accept()
conn_list.append(conn)
except BlockingIOError : #在非阻塞的时候,accept收不到数据会发生BlockingIOError错误
for conn in conn_list:
try:
data=conn.recv(1024)
conn.send(data.upper())
except BlockingIOError :
continue
except ConnectionResetError : #这步是为了关闭某个客户端的时候不发生异常
del_conn.append(conn) #不能在循环的时候删列表,把要删除的连接放到列表中统一删除 for conn1 in del_conn:
conn_list.remove(conn1)
conn1.close()
del_conn=[]
continue
server.close()

服务端

import  socket

client=socket.socket()
client.connect(('127.0.0.1',8081)) while True:
date = input('>>>>')
client.send(date.encode())
data = client.recv(1024)
print('>>>', data.decode()) client.close()

客户端

但是我们不推荐非io阻塞模型

我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

但是也难掩其缺点:

#1. 循环调用recv(),会一直调用操作系统,将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
#2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
白话应该应该这么说:我们可以在等待任务的时候做其他的事,但是必须要等待其他事做完后才能做我们该做的事,这样影响了,我们的执行效率

    此外,在这个方案中我们多次使用try 来捕捉异常来检测到底有没有返回结果没有结果则返回异常,我们只能手动检测一个套接字,不能检测多个套接字,实际上python提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否有数据产生.

io多路复用(同步io)模型

I/O多路复用,I/O就是指的我们网络I/O,多路指多个TCP连接(或多个Channel),复用指复用一个介质。串起来理解就是io多路复用就是:多个请求用一个介质来处理io阻塞问题.其实这个介质就是(select/epoll).

io多路复用的是用来检测多个套接字是否有变化以便不同的变化做出不同的处理. 我认为比较好的解释:IO多路复用是指内核一旦发现线程指定的一个或者多个IO条件准备读取,它就通知该进程。

select,poll,epoll都是IO多路复用的机制。I/O多路复用就通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间

IO多路复用优点:

与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
    这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

调:

    1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

    2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

    结论: select的优势在于可以处理多个连接,不适用于单个连接

该模型的优点:

#相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

    该模型的缺点:

#首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,
如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,
所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
#其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

单线程下 使用select实现并发:

import socket
import select server=socket.socket()
server.bind(('127.0.0.1',8081))
server.listen(5)
print('start....')
server.setblocking(False) #默认是True, False变为非阻塞了
reads_l=[server,] #监听列表
del_l=[]#删除列表
while True:
r_l,_,_= select.select(reads_l,[],[])#rlist 检测套接字,返回值是三个列表组成的元组
# print('r_l',r_l)
for obj in r_l: #由于r_L是个列表,所以要for 循环取值
if obj==server: #因为我们要检测多个套接字,所以要做判断
coon,addr=obj.accept()
reads_l.append(coon)
print(addr)
else:
date = obj.recv(1024)
obj.send(date.upper()) server.close()

服务端

客户端:

import  socket

client=socket.socket()
client.connect(('127.0.0.1',8081)) while True:
date = input('>>>>')
client.send(date.encode())
data = client.recv(1024)
print('>>>', data.decode()) client.close()

客户端

 epoll poll select 的区别?

select,poll,epoll都是i/o多路复用的具体实现方式
###select
select会修改传入的参数的数组,这个对于一个需要调用很多次的函数是非常不友好的.
select只能监视1024个连接
select 当任何一个socket对象出现了数据,select不会告诉你哪个socket有数据,需要你for循环来问,效率低
select不是线程安全,
#线程安全就是多线程访问时,采用了加锁机制,当一个线程访问该类的某个数据时,进行保护,其他线程不能进行访问直到该线程读取完,其他线程才可使用。不会出现数据不一致或者数据污染
###poll修复了select的很多问题
1.去掉了1024个连接的限制
2.poll不再修改传入的参数的数组
3.但是还不是线程安全的
4.不能准确的知道哪个select有数据,效率低
###epoll 修复了poll和select绝大多数的问题
1.线程安全
2.epoll可以准确的告诉你哪个socket里有数据,具体实现方式:当哪个socket有数据时会通知你,你不用for 循环来一一个问效率高

selectors

该模块会自动根据当前的操作系统来选择合适的io多路复用模型,比如linux就用epoll

#服务端
from socket import *
import selectors sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
sel.register(conn,selectors.EVENT_READ,read) def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close() server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept while True:
events=sel.select() #检测所有的fileobj,是否有完成wait data的
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1) #客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088)) while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))

selectors

异步io模型

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

这种模型的效率是最高的。

网络编程并发 多进程 进程池,互斥锁,信号量,IO模型的相关教程结束。

《网络编程并发 多进程 进程池,互斥锁,信号量,IO模型.doc》

下载本文的Word格式文档,以方便收藏与打印。