python线程相关
1、线程
线程是进程里的执行单位,进程是一个资源单位,一个内存空间的描述,相当于车间,进程是实际工作的,相当于流水线,流水线需要在车间里面,车间本身不干活,是流水线在干活。
每个进程至少都有一个线程,一个进程内可以有多个线程,线程执行过程中所需要的资源都去进程索要,同一个进程内的线程数据是共享的。
在一个进程内开设多个线程无需再次申请内存空间及拷贝代码的操作。
开设线程的开销要比开多进程小很多。
2、开启线程的两种方式
from threading import Thread
from multiprocessing import Process
############ 方式一 #####################################################
def task():
print('1111')
# 开启线程
t = Thread(target=task)
t.start()
print('下一步')
############ 方式二 #####################################################
class MyThread(Thread):
def run(self):
print('class')
if __name__ == '__main__':
t = MyThread()
t.start()
3、TCP使用线程实现并发
import socket
from threading import Thread
from multiprocessing import Process
server = socket.socket()
server.bind(('127.0.0.1', 8080))
def talk(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError as e:
break
while True:
conn, addr = server.accept()
# 开线程
t = Thread(target=talk, args=(conn, ))
t.start()
# 客户端代码
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while True:
client.send('hello')
data = client.recv(1024)
print(data.decode('utf-8'))
4、线程的join
from threading import Thread
import time
def task():
print('1111')
if __name__ == '__main__':
t = Thread(target=task)
t.start()
t.join() # 主线程等待子线程结束之后再继续执行
print('主')
5、同进程数据共享
同一个进程下的线程的数据是共享的
from threading import Thread
import time
money = 100
def task():
global money
money = 11
if __name__ == '__main__':
t = Thread(target=task)
t.start()
t.join()
print(money)
6、线程的属性和方法
获取线程名
from threading import Thread, current_thread
def task():
print(current_thread().name)
# 开启线程
if __name__ == '__main__':
t = Thread(target=task)
t2 = Thread(target=task)
t.start()
t2.start()
print(current_thread().name)
'''
Thread-1
Thread-2
MainThread
'''
统计当前活跃的线程数
from threading import Thread, active_count
import time
def task():
time.sleep(2)
# 开启线程
if __name__ == '__main__':
t = Thread(target=task)
t.start()
t2 = Thread(target=task)
t2.start()
print(active_count()) # 3
7、守护线程
主线程运行结束之后不会立刻结束,会等待其他所有的非守护子线程结束才会结束,因为主线程的结束意味着所在的进程结束。
from threading import Thread
import time
def task():
print('start')
time.sleep(2)
print('over')
# 开启线程
if __name__ == '__main__':
t = Thread(target=task)
t.daemon = True
t.start()
print('2222222222222')
8、线程互斥锁
from threading import Thread, Lock
import time
money = 100
mutex = Lock()
def task():
global money
mutex.acquire()
tmp = money
time.sleep(1)
money = tmp - 1
mutex.release()
if __name__ == "__main__":
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(money)
9、GIL 全局解释器锁
python解释器有多个版本
- Cpython
- Jpython
- Pypypython
但是普遍使用Cpython,在Cpython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程同时执行,就导致了同一个进程下的多个线程无法利用多核优势
因为cpython中的内存管理(垃圾回收机制/GC)不是线程安全的
- 因为垃圾回收机制是一条线程,而线程的执行不是有序的,如果线程可以多核运行,可能会出现垃圾回收机制线程优先抢到数据,然后GC发现这个变量没有被引用啊,直接给回收了,后续线程来了,人都麻了,没变量数据了。所以是不安全的,所以需要GIL
GIL的重点:
- GIL不是python的特点而是Cpython解释器的特点
- GIL是保证解释器级别的数据的安全
- GIL会导致同一个进程下的多个线程的无法同时执行,即无法利用多核优势
- 针对不同的数据还是需要加不同的锁处理
- 解释性的语言的通病,同一个进程下的线程无法利用多核优势,因为解释器语言是解释一行代码执行一行代码,无法做到整体的编译完了之后再调整,每一行执行的时候不知道下一行会发生什么。
9.1 GIL和普通互斥锁的区别
GIL无法保证数据的安全,只能保证垃圾回收机制的线程的安全,所以在操作数据的时候,还是需要额外加一把锁,当线程抢到GIL的时候,然后遇到IO释放,还可以通过自己的锁锁住,不会被别的线程抢走,从而导致数据错乱。
10、多进程与多线程
不考虑单核,因为单核都差不多。只考虑多核CPU的情况
- IO密集型:在IO密集型的时候多线程耗时较短,因为节省了开进程的消耗。
- 计算密集型:计算密集型里面多进程耗时较短,因为多个核肯定比一个核算的快。
实际中,通常是多进程里面开多线程。
11、死锁与递归锁
死锁:在操作锁的时候很容易产生死锁现象,程序阻塞卡死。
from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('%s抢到A锁'%self.name)
mutexB.acquire()
print('%s抢到B锁' % self.name)
mutexB.release()
print('%s释放A锁' % self.name)
mutexA.release()
print('%s释放B锁' % self.name)
def func2(self):
mutexB.acquire()
print('%s抢到B锁' % self.name)
time.sleep(2)
mutexA.acquire()
print('%s抢到A锁' % self.name)
mutexB.release()
print('%s释放B锁' % self.name)
mutexA.release()
print('%s释放A锁' % self.name)
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
递归锁:可以被连续的require()和release(),但是只能被第一个抢到的人执行,它的内部有一个计数器,没require一次+1,没release一次-1,只要计数器不为0,其他人就无法抢锁。
from threading import Thread, Lock, Rlock
import time
# mutexA = Lock()
# mutexB = Lock()
mutexA = mutexB = Rlock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('%s抢到A锁'%self.name)
mutexB.acquire()
print('%s抢到B锁' % self.name)
mutexB.release()
print('%s释放A锁' % self.name)
mutexA.release()
print('%s释放B锁' % self.name)
def func2(self):
mutexB.acquire()
print('%s抢到B锁' % self.name)
time.sleep(2)
mutexA.acquire()
print('%s抢到A锁' % self.name)
mutexB.release()
print('%s释放B锁' % self.name)
mutexA.release()
print('%s释放A锁' % self.name)
if __name__ == '__main__':
for i in range(10):
t = MyThread()
t.start()
12、信号量
在不同阶段可能对应不同的技术点,在并发编程中信号量指的是锁。
互斥锁只有一个锁,所有的线程都抢一把锁,信号量代表有多把锁,多个线程抢多个锁。
from threading import Thread, Semaphore
import time
import random
# 括号内的参数是几就有几把锁
sm = Semaphore(4)
def task(name):
sm.acquire()
print('%s抢到锁'%name)
time.sleep(random.randint(1,5))
sm.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task, args=('%s号'%i,))
t.start()
13、Event事件
类似一个发布订阅,当set()了,wait()卡住的地方才会放行。
from threading import Thread, Event
import time
event = Event()
def light():
print('等待中')
time.sleep(3)
# 等待完成
event.set()
def car(name):
# 等待中,等到set()执行后,执行wait下面的代码
event.wait()
print('%s 执行'%name)
if __name__ == '__main__':
t = Thread(target=light)
t.start()
for i in range(5):
t = Thread(target=car, args=(i,))
t.start()
14、池
池:是用来保证计算机硬件安全的情况下最大限度的利用计算机,它降低了程序的运行效率但是保证了计算机硬件的安全,从而让程序能正常运行。
14.1 线程池
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
# 默认cpu个数的5倍的线程量
# 池子里固定只有5个线程,这五个线程不会出现重复创建和销毁的过程
pool = ThreadPoolExecutor(5)
def task(n):
time.sleep(0.1)
return n
# 提交任务 异步
# res = pool.submit(task,1)
# 获取返回值,调用这个方法会使并发变成串行,异步变成同步
for i in range(10):
res = pool.submit(task, i)
print(res.result())
这样可以依次获取提交的数据,只是本是并发的任务变成了串行,需要加一个list,后面再执行
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
# 默认cpu个数的5倍的线程量
# 池子里固定只有5个线程,这五个线程不会出现重复创建和销毁的过程
pool = ThreadPoolExecutor(5)
def task(n):
print(n)
time.sleep(2)
return n
# 提交任务 异步
# res = pool.submit(task,1)
# 获取返回值,调用这个方法会使并发变成串行,异步变成同步
t_list = []
for i in range(10):
res = pool.submit(task, i)
t_list.append(res)
for t in t_list:
print('>>>>',t.result())
现在可以异步,但是是交叉执行的,需要等提交的任务执行完毕,然后再执行获取结果
# 加一行这个代码
pool.shutdown()
for t in t_list:
print('>>>>',t.result())
14.2 进程池
和线程池用法基本上一直,把ThreadPoolExecutor()
换成ProcessPoolExecutor()
就可以了。
差别是线程池默认是CPU个数的五倍,而进程池默认是CPU的个数,进程池里面的进程也不会重复创建和销毁。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, os
pool = ProcessPoolExecutor(5)
def task(n):
# 查看进程id号
print(n, os.getpid())
time.sleep(1)
return n
if __name__ == "__main__":
for i in range(10):
pool.submit(task, i)
不想手动收集返回结果,更多的是使用回调来获取执行成功的结果。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
pool = ProcessPoolExecutor(5)
def task(n):
time.sleep(1)
return n
def cb(n):
print('cb=>>>',n.result())
if __name__ == "__main__":
for i in range(10):
pool.submit(task, i).add_done_callback(cb)
15、协程
可以在单线程下实现并发,程序员在代码层面检测IO操作,一旦遇到IO操作,在代码级别进行切换,这样cpu就会一直在运行,提升程序的运行效率。
检测IO,使用gevent
模块
pip3 install gevent
gevent模块本身无法检测常见的IO操作,需要额外导入一下内容才可以
from gevent import monkey
monkey.patch_all()
from gevent import spawn
# 或者
from gevent import monkey; monkey.patch_all()
from gevent import spawn
# 使用
def func();
pass
# 监控
res = spawn(func)
res.join()
16、队列
队列默认是先进先出
# 先进先出
q = queue.Queue()
q.put()
q.get()
q.full()
# 后进先出
q = queue.LifoQueue()
# 优先级
q = queue.PriorityQueue()
# 第一个参数是优先级,第二个是数据,数字越小优先级越高
q.put((1, 'data'))