多进程和多线程,Thread模块 GIL全局解释锁, 进程池与线程池,协程

2023-06-25,,

1.多进程实现TCP服务端并发:

import socket
from multiprocessing import Process def get_server():

server = socket.socket()

server.bind(('127.0.0.1', 8080))

server.listen(5)

return server def get_talk(sock):

while True:

data = sock.recv(1024)

print(data.decode('utf8'))

sock.send(data.upper())
if name == 'main':

server = get_server()

while True:

sock, addr = server.accept()

# 开设多进程去聊天

p = Process(target=get_talk, args=(sock,))

p.start()

2.互斥锁代码实操:

锁:建议只加载操作数据的部分 否则整个程序的效率会极低

from multiprocessing import Process, Lock
import time
import json
import random def search(name):

with open(r'data.json', 'r', encoding='utf8') as f:

data = json.load(f)

print('%s查看票 目前剩余:%s' % (name, data.get('ticket_num'))) def buy(name):

# 先查询票数

with open(r'data.json', 'r', encoding='utf8') as f:

data = json.load(f)

# 模拟网络延迟

time.sleep(random.randint(1, 3))

# 买票

if data.get('ticket_num') > 0:

with open(r'data.json', 'w', encoding='utf8') as f:

data['ticket_num'] -= 1

json.dump(data, f)

print('%s 买票成功' % name)

else:

print('%s 买票失败 非常可怜 没车回去了!!!' % name) def run(name, mutex):

search(name)

mutex.acquire() # 抢锁

buy(name)

mutex.release() # 释放锁
if name == 'main':

mutex = Lock() # 产生一把锁

for i in range(10):

p = Process(target=run, args=('用户%s号' % i, mutex))

p.start()

"""

锁有很多种 但是作用都一样

行锁 表锁 ...

"""

3.线程理论

进程:
    进程其实是资源单位 表示一块内存空间

线程:
    线程才是执行单位 表示真正的代码指令

我们可以将进程比喻是车间 线程是车间里面的流水线
一个进程内部至少含有一个线程

1.--->一个进程内可以开设多个线程
2.--->同一个进程下的多个线程数据是共享的
3.--->创建进程与线程的区别
      创建进程的消耗要远远大于线程


4.创建线程的两种方式

第一种:

from threading import Thread
from multiprocessing import Process
import time

def task(name):

print(f'{name} is running')

time.sleep(0.1)

print(f'{name} is over')

if name == 'main':

start_time = time.time()

# p_list = []
# for i in range(100):
# p = Process(target=task, args=('用户%s'%i,))
# p.start()
# p_list.append(p)
# for p in p_list:
# p.join()
# print(time.time() - start_time)
# t_list = []
# for i in range(100):
# t = Thread(target=task, args=('用户%s'%i,))
# t.start()
# t_list.append(t)
# for t in t_list:
# t.join()
# print(time.time() - start_time)

t = Thread(target=task, args=('jason',))

t.start()

print('主线程')

"""

创建线程无需考虑反复执行的问题

"""

第二种:

class MyThread(Thread):
def run(self):
print('run is running')
time.sleep(1)
print('run is over')
obj = MyThread()

obj.start()

print('主线程')

5.线程的诸多特性

1.------>join方法
2.------>同进程内多个线程数据共享
3.------>current_thread()
4.------>active_count()


多线程实现socket:

import multiprocessing
import threading import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)

s.bind(('127.0.0.1',8080))

s.listen(5) def action(conn):

while True:

data=conn.recv(1024)

print(data)

conn.send(data.upper()) if name == 'main':
while True:
conn,addr=s.accept()
p=threading.Thread(target=action,args=(conn,))
p.start()</code></pre>

守护线程:

主函数中的for循环和线程t是两个不同的线程,其中for循环是主线程,当把t设置为守护线程时,主线程for循环运行结束后t线程会同时结束。

6.GIL全局解释器锁

# 官方文档对GIL的解释
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once.
This lock is necessary mainly because CPython’s memory management is not thread-safe.
(However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.
"""
1.在CPython解释器中存在全局解释器锁简称GIL
python解释器有很多类型
CPython JPython PyPython (常用的是CPython解释器)
2.GIL本质也是一把互斥锁 用来阻止同一个进程内多个线程同时执行(重要)
3.GIL的存在是因为CPython解释器中内存管理不是线程安全的(垃圾回收机制)
垃圾回收机制
引用计数、标记清除、分代回收
"""
所有解释性语言都逃不开垃圾回收线程和代码线程同时执行的魔咒,都需要有锁,这个是解释语言的通病

7.验证GIL的存在

from threading import Thread

num = 100

def task():

global num

num -= 1
t_list = []

for i in range(100):

t = Thread(target=task)

t.start()

t_list.append(t)

for t in t_list:

t.join()

print(num) #0

'''多个线程之间同时操作一个数据,会出现数据错乱问题,怎么办?加锁'''
from threading import Lock
lock = Lock()
# 1. 加锁
lock.acquire()
# 2. 释放锁
lock.release()
'''加完锁之后,不管有多少个线程进来,同一时刻,只有一个线程进来,等这个线程执行完毕,释放锁之后,其他线程才能在进来''' 问题:

有了GIL锁,还会出现并发安全问题码?会,此时程序变成了串行执行,为什么还会出现并发安全问题呢?

比如:起了2个线程,执行任务:a=a+1,a以开始等于0

1. 第一个线程过来了,拿到了a,计算a=a+1,当计算结果还没有赋值回去

2. 第二个线程来了,拿到了a,此时拿到的a还是0,计算a=a+1,线程切换会第一个线程,a变为1
'''面试题:有了GIL锁,为什么还要互斥锁?'''</code></pre>

8.GIL与普通互斥锁

GIL锁与互斥锁综合分析(重点!!!):

分析:

#1. 100个线程去抢GIL锁,即抢执行权限

#2. 肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()

#3. 极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻 塞,被迫交出执行权限,即释放GIL

#4. 直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程

既然CPython解释器中有GIL 那么我们以后写代码是不是就不需要操作锁了!!!
"""
GIL只能够确保同进程内多线程数据不会被垃圾回收机制弄乱
并不能确保程序里面的数据是否安全
"""
'''
多线程同时操作一个数据,数据错乱,
并发安全问题--->加锁--》让原本并发的操作,变成串行,牺牲效率,保证安全---》通过
线程queue也可以避免并发安全的问题,所有queue的本质就是锁
'''
import time
from threading import Thread,Lock num = 100 def task(mutex):

global num

mutex.acquire()

count = num

time.sleep(0.1)

num = count - 1

mutex.release()
mutex = Lock()

t_list = []

for i in range(100):

t = Thread(target=task,args=(mutex,))

t.start()

print(t.name) # 线程名

print(t.getName()) # 线程名

t.setName('aaa') # 改线程名

t.is_alive() # 查看线程是否存回

t_list.append(t)

for t in t_list:

t.join()

print(num)

9.python多线程是否有用

需要分情况
情况1
单个CPU
多个CPU
情况2
IO密集型(代码有IO操作)
计算密集型(代码没有IO)
1.单个CPU

IO密集型

多进程

申请额外的空间 消耗更多的资源

多线程

消耗资源相对较少 通过多道技术

ps:多线程有优势!!!

计算密集型

多进程

申请额外的空间 消耗更多的资源(总耗时+申请空间+拷贝代码+切换)

多线程

消耗资源相对较少 通过多道技术(总耗时+切换)

ps:多线程有优势!!!

2.多个CPU

IO密集型

多进程

总耗时(单个进程的耗时+IO+申请空间+拷贝代码)

多线程

总耗时(单个进程的耗时+IO)

ps:多线程有优势!!!

计算密集型

多进程

总耗时(单个进程的耗时)

多线程

总耗时(多个进程的综合)

ps:多进程完胜!!!

10死锁现象


from threading import Thread,Lock
import time mutexA = Lock() # 产生一把锁

mutexB = Lock() # 产生一把锁 class MyThread(Thread):

def run(self):

self.func1()

self.func2()
def func1(self):
mutexA.acquire()
print(f'{self.name}抢到了A锁')
mutexB.acquire()
print(f'{self.name}抢到了B锁')
mutexB.release()
print(f'{self.name}释放了B锁')
mutexA.release()
print(f'{self.name}释放了A锁') def func2(self):
mutexB.acquire()
print(f'{self.name}抢到了B锁')
time.sleep(1)
mutexA.acquire()
print(f'{self.name}抢到了A锁')
mutexA.release()
print(f'{self.name}释放了A锁')
mutexB.release()
print(f'{self.name}释放了B锁')

for i in range(10):

obj = MyThread()

obj.start()

'''

打印结果:

Thread-1抢到了A锁

Thread-1抢到了B锁

Thread-1释放了B锁

Thread-1释放了A锁

Thread-1抢到了B锁

Thread-2抢到了A锁

'''

 当有一个线程抢到了A锁时(第一把锁),后面的锁就全是他的,直到碰到一个IO操作时,就会有另一个进程抢到了A锁,当IO操作执行完时,第一个线程需要A锁,第二个线程需要B锁,程序就会卡住


线程队列:

"""
同一个进程下多个线程数据是共享的
为什么先同一个进程下还会去使用队列呢
因为队列是
管道 + 锁
所以用队列还是为了保证数据的安全
""" from multiprocessing import Queue

进程队列:Queue import queue

线程队列:queue """

进程Queue用于父进程与子进程(或同一父进程中多个子进程)间数据传递

python自己的多个进程间交换数据或者与其他语言(如Java)进程queue就无能为力
queue.Queue 的缺点是它的实现涉及到多个锁和条件变量,因此可能会影响性能和内存效率。

"""

线程队列有3种:

1. 先进先出

2. 先进后出

3. 优先级队列

'''优先级队列'''

q = queue.PriorityQueue()

put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高

q.put((5, 'a')) # 元组里面的第一个数字代表的是优先级,第二个元素是具体的数据

q.put((15, 'b'))

q.put((10, 'c'))

print(q.get())

print(q.get())

print(q.get())

'''

结果(数字越小优先级越高,优先级高的优先出队):

(10, 'b')

(20, 'a')

(30, 'c')

'''

11.信号量

在python并发编程中信号量相当于多把互斥锁(公共厕所)

同一时间可以有5个线程去抢

from threading import Thread, Lock, Semaphore
import time
import random sp = Semaphore(5) # 一次性产生五把锁 class MyThread(Thread):

def run(self):

sp.acquire()

print(self.name)

time.sleep(random.randint(1, 3))

sp.release()
for i in range(20):

t = MyThread()

t.start()

12.event事件

例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

from threading import Thread, Event
import time event = Event() # 类似于造了一个红绿灯 def light():

print('红灯亮着的 所有人都不能动')

time.sleep(3)

print('绿灯亮了 油门踩到底 给我冲!!!')

event.set() def car(name):

print('%s正在等红灯' % name)

event.wait()

print('%s加油门 飙车了' % name)
t = Thread(target=light)

t.start()

for i in range(20):

t = Thread(target=car, args=('熊猫PRO%s' % i,))

t.start()

13.进程池与线程池

进程和线程能否无限制的创建 不可以
因为硬件的发展赶不上软件 有物理极限 如果我们在编写代码的过程中无限制的创建进程或者线程可能会导致计算机奔溃

池:盛放更多个对象的,盛放更多的进程和线程

'''进程池和线程池都是提供的是异步调用,只需要往池子里丢任务,我们不需要等待结果,等池子里面的任务完成之后,内部做了一个回调,告诉我们执行后的结果!!!'''

进程池:

提前创建一个池子,这个池子里面先创建号一怼进程,然后,只需要向池子里面丢任务就行.

线程池:

提前创建一个池子,这个池子里面先创建号一怼线程,然后,只需要向池子里面丢任务就行.

创建池的目的:节省资源,防止内存被沾满的情况,另外就是也能提升效率,但是不能无限的开进程.


降低程序的执行效率 但是保证了计算机硬件的安全
进程池
提前创建好固定数量的进程供后续程序的调用 超出则等待
线程池
提前创建好固定数量的线程供后续程序的调用 超出则等待
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import os
import time
import random
from threading import current_thread # 1.产生含有固定数量线程的线程池
# pool = ThreadPoolExecutor(10)
pool = ProcessPoolExecutor(5) def task(n):
print('task is running')
# time.sleep(random.randint(1, 3))
# print('task is over', n, current_thread().name)
# print('task is over', os.getpid())
return '我是task函数的返回值' def func(*args, **kwargs):
print('from func') if __name__ == '__main__':
# 2.将任务提交给线程池即可
for i in range(20):
# res = pool.submit(task, 123) # 朝线程池提交任务
# print(res.result()) # 不能直接获取
# pool.submit(task, 123).add_done_callback(func)

异步:任务提交之后不给你等待任务的结果,但是我们需要任务的结果我们需要,这个时候可以添加一个异步回调机制


14.协程

"""
进程:资源单位
线程:执行单位
协程:单线程下实现并发(效率极高)
在代码层面欺骗CPU 让CPU觉得我们的代码里面没有IO操作
实际上IO操作被我们自己写的代码检测 一旦有 立刻让代码执行别的
(该技术完全是程序员自己弄出来的 名字也是程序员自己起的)
核心:自己写代码完成切换+保存状态
"""
import time
from gevent import monkey; monkey.patch_all() # 固定编写 用于检测所有的IO操作(猴子补丁)

from gevent import spawn def func1():

print('func1 running')

time.sleep(3)

print('func1 over') def func2():

print('func2 running')

time.sleep(5)

print('func2 over')
if name == 'main':

start_time = time.time()

# func1()

# func2()

s1 = spawn(func1) # 检测代码 一旦有IO自动切换(执行没有io的操作 变向的等待io结束)

s2 = spawn(func2)

s1.join()

s2.join()

print(time.time() - start_time) # 8.01237154006958 协程 5.015487432479858

15.协程实现并发

import socket
from gevent import monkey;monkey.patch_all() # 固定编写 用于检测所有的IO操作(猴子补丁)
from gevent import spawn def communication(sock):

while True:

data = sock.recv(1024)

print(data.decode('utf8'))

sock.send(data.upper()) def get_server():

server = socket.socket()

server.bind(('127.0.0.1', 8080))

server.listen(5)

while True:

sock, addr = server.accept() # IO操作

spawn(communication, sock) s1 = spawn(get_server)

s1.join()
如何不断的提升程序的运行效率

多进程下开多线程 多线程下开协程

GIL锁:(重点)

Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行

对Python解释器的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

    我们写的代码其实是解释器在执行:

    cpython解释器,pypy解释器... GIL锁只存在cpython解释器中,在其他解释器中不存在 起一个垃圾回收的线程,起一个正常执行代码的线程,当垃圾回收线程还没有把垃圾回收完毕的时候,会出现抢占资源的情况。 我们就需要拿到GIL锁,要想让线程能够正常执行,那么,线程就必须要拿到这把锁(GIL锁) 保证了,同一时刻,只有一个线程在执行。

    ##########################理解记忆########################################## python有GIL锁的原因,限制多线程同时执行,同一个进程下多个线程实际上同一时刻,只有一个线程在执行 只有在python上开进程用的多,其他语言一般不开多进程,只开多线程就够了 cpython解释器开多线程不能利用多核优势,只有开多进程才能利用多核优势,其他语言不存在这个问题 8核cpu电脑,充分利用起我这个8核,至少起8个进程,每个进程有一个线程,8条线程全是计算--->计算机cpu使用率是100%, 如果不存在GIL锁,一个进程下,开启8个线程,它就能够充分利用cpu资源,跑满cpu cpython解释器中好多代码,模块都是基于GIL锁机制写起来的,改不了了---》我们不能有8个核,但我现在只能用1核,----》开启多进程---》每个进程下开启的线程,可以被多个cpu调度执行 cpython解释器:io密集型使用多线程,计算密集型使用多进程

    # -io密集型,遇到io操作会切换cpu,假设你开了8个线程,8个线程都有io操作---》io操作不消耗cpu---》一段时间内看上去,其实8个线程都执行了

-计算密集型,消耗cpu,如果开了8个线程,第一个线程会一直占着cpu,而不会调度到其他线程执行,其他7个线程根本没执行,所以我们开8个进程,每个进程有一个线程,8个进程下的线程会被8个cpu执行,从而效率高

##########################理解记忆#######################################

'''

进程:可以利用多核,

线程:没办法利用多核,
i/o密集型:不需要计算,一般不消耗CPU,所以就选择线程比较好
计算密集型:在消耗CPU,所以做好选择进程

'''

for i in range(1000000000000000000000000000000000000000000000):

i+=1

print(i)

import time

for i in range(1000000000000000000000000000000000000000000000):

time.sleep(1)

print(i)

多进程和多线程,Thread模块 GIL全局解释锁, 进程池与线程池,协程的相关教程结束。

《多进程和多线程,Thread模块 GIL全局解释锁, 进程池与线程池,协程.doc》

下载本文的Word格式文档,以方便收藏与打印。