侧边栏壁纸
博主头像
AngYi 博主等级

行动起来,活在当下

  • 累计撰写 68 篇文章
  • 累计创建 28 个标签
  • 累计收到 24 条评论

目 录CONTENT

文章目录

Python 多线程

AngYi
2022-03-31 / 0 评论 / 0 点赞 / 252 阅读 / 0 字

多线程Thread、多进程Process、多协程Coroutine

CPU密集型任务

cpu密集型任务也叫计算型密集任务,是指I/O在很短的时间内可以完成,而CPU的处理时间很长,cpu占用率会很高。例如加密解密,压缩和解压缩,正则表达式搜索等。

IO密集型任务

是指任务会对硬盘/内存的读取大大增加,cpu的占用率很低。如文件处理程序,网络爬虫程序,读写数据库等。

多线程 多进程 多协程之间的对比

技术选择

全局解释器锁 (GIL)

即使电脑有多核CPU,单个时刻也只能运行1个线程,遇到io之后转换到另一个线程运行。
为什么要有这个GIL,为了解决多线程之间数据完整性和状态同步问题。有了GIL简化了Python对于共享资源的管理。

这里有两句话非常好的解释了这些疑难点:

  • Python的线程是有GIL锁的,所以是单核的。
  • Python的进程每个都有独立的GIL锁,所以是多核的,但是每个都需要独立的分配内存空间,所以空间占用大
    这也是为什么说的python是假的多线程,线程其实只能运行一个,只不过是在不停切换,所以说python多线程是并发的。而多进程才是真正并行的。

并发可以理解为你一个人烧着水的同时,不会等待,而是去切菜。遇到io时切换到另一个任务,导致cpu会一直在执行任务,不会等待,减少时间浪费。、
并行可以理解为直接开辟了另一个厨房,跟你在同步进行,你烧水,他也烧水。真正的并行处理,只需要一道菜的时间就能做出两道菜。

Python的并发编程模块

多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会单线程等待io时间
多进程:multiprocessing,利用多核cpu的能力,真正的并行执行任务
异步io:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行。

Python 创建多线程的方法

通过 threading.Thread方法 创建

先来一个简单案例,写两个函数,实现一边跳舞一边唱歌:

import threading
import time 

def dance():
    for i in range(5):
        print("dancing.....")
        time.sleep(0.1)
    
def sing():
    for i in range(5):
        print("singing.....")
        time.sleep(0.1) # 等待 就会切换进程
    
    
if __name__ == "__main__":
    t1 = threading.Thread(target=dance,) # 注意函数不用写括号
    t2 = threading.Thread(target=sing)
    
    t1.start()
    t2.start()

创建了线程,就是这个函数或者这个功能,可以被丢在后台执行了,不用等他了,可以执行别的了。start方法之后才被丢到后台执行。
下面这个例子说明多线程爬虫的好处

import requests
import threading

def craw(url):
    r = requests.get(url)
    print(url, len(r.text))
    
    
def multi_thread(urls):
    threads = []
    for url in urls:
        threads.append(
            threading.Thread(target=craw, args=(url,))
        )
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
        
        
def single_thread(urls):
    for url in urls:
        craw(url)
        
        
if __name__ == '__main__':
    urls = [
        f'https://www.cnblogs.com/#/p{page}'
        for page in range(1, 50 + 1)
    ]
    import time
    start = time.time()
    single_thread(urls)
    end = time.time()
    print('single thread cost: ', end - start, 'seconds')
    print("======================= ")
    start = time.time()
    multi_thread(urls)
    end = time.time()
    print('multi thread cost: ', end - start, 'seconds')

单线程大约耗时10秒,多线程1秒
线程的执行没有前后顺序,操作系统说了算。当调用thread的时候,不会创建线程,调用start方法才会创建线程并开始执行。

通过继承Thread类创建线程

import threading
import time

'''
如果代码逻辑复杂,就用类来写
'''
class MyThread(threading.Thread):
    '''
    要实现多线程类 就必须复写 run 方法,在这里面调用其他函数
    
    '''
    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "i am "+self.name+'@'+str(i)
            print(msg)
        # 或者调用其他方法

if __name__ == "__main__":
    t = MyThread()
    t.start() # 多线程启动的时候还是 用start方法

互斥锁 解决资源竞争问题

利用lock来解决线程安全问题,锁的概念可以理解为通行证,只有一个线程能拥有这个证,能对这个变量进行操作,其他线程无法对其进行操作,从而避免线程切换带来的诸多问题。

同时修改一个变量

先来看一个简单的有问题的例子,用两个函数分别对同一个变量进行相加:

import threading
import time

global_num = 0

def test1(tem):
    global global_num
    for i in range(tem):
        global_num += 1


def test2(tem):

    global global_num
    for i in range(tem):
        global_num += 1


if __name__ == "__main__":
    t1 = threading.Thread(target=test1, args=(1000000,))  # 注意args 为 元组 要加逗号
    t2 = threading.Thread(target=test1, args=(1000000,))

    t1.start()
    t2.start()
    time.sleep(5)
    print(global_num)  # 输出并不是2000000

其实这个问题的根本是操作没有做完,要不你把你的做完,要不就别做。跟数据库的事务相似。线程如果是一个写,一个读一般情况不会出问题,但是同时修改就出问题了,那怎么解决这个问题呢,就是用锁;这里用到一种锁,互斥锁。我这个线程再用这个变量的时候,就锁上了,别的线程用不了。这个锁只能锁一次。

import threading
import time

global_num = 0
lock = threading.Lock()


def test1(tem):

    global global_num

    for i in range(tem):
        # 上锁  有一个原则,锁起来的代码越少越好,比放在for外面要好
        lock.acquire()
        global_num += 1
        # 释放锁
        lock.release()
    print(global_num)


def test2(tem):

    global global_num
    for i in range(tem):
        # 上锁
        lock.acquire()
        global_num += 1
        # 释放锁
        lock.release()


if __name__ == "__main__":
    t1 = threading.Thread(target=test1, args=(1000000,))  # 注意args 为 元组 要加逗号
    t2 = threading.Thread(target=test1, args=(1000000,))

    t1.start()
    t2.start()
    time.sleep(5)
    print(global_num)
# 但是这样做跟单线程不是一样的了吗,多线程还有什么意义?

下面这个介绍接近实际的例子,比如银行取钱。

银行取钱业务

import time
import threading
class Account(object):
    def __init__(self, account):
        self.account = account
        
def draw_money(account, amount):
    if account.account >= amount:
        time.sleep(1)
        account.account -= amount
        print(threading.current_thread().name, "取钱成功")
        print(threading.current_thread().name, f"剩余金额{account.account}")
    else:
        print(threading.current_thread().name, "取钱失败")
        
        
if __name__ == '__main__':
    account = Account(1000)
    ta = threading.Thread(target=draw_money, args=(account, 800))
    tb = threading.Thread(target=draw_money, args=(account, 800))
    ta.start()
    tb.start()

输出:

/Users/angyi/miniforge3/envs/flask/bin/python /Users/angyi/Documents/Flask/multiprocess_learn/06_lock_concurrent.py
Thread-1 取钱成功
Thread-1 剩余金额200
Thread-2 取钱成功
Thread-2 剩余金额-600
Process finished with exit code 0

取钱去了两次都成功了,很明显账户余额出问题了,本来就1000块钱,让你取了1600,两次都成功了。
lock 解决线程安全问题

import time
import threading
lock = threading.Lock()


class Account(object):
    def __init__(self, account):
        self.account = account
        
        
def draw_money(account, amount):
    with lock: # 上下文lock
        if account.account >= amount:
            time.sleep(1)
            account.account -= amount
            print(threading.current_thread().name, "取钱成功")
            print(threading.current_thread().name, f"剩余金额{account.account}")
        else:
            print(threading.current_thread().name, "取钱失败")
            
            
if __name__ == '__main__':
    account = Account(1000)
    ta = threading.Thread(target=draw_money, args=(account, 800))
    tb = threading.Thread(target=draw_money, args=(account, 800))
    ta.start()
    tb.start()

输出:

/Users/angyi/miniforge3/envs/flask/bin/python /Users/angyi/Documents/Flask/multiprocess_learn/06_lock_concurrent.py
Thread-1 取钱成功
Thread-1 剩余金额200
Thread-2 取钱失败
Process finished with exit code 0

这样就保证了线程安全性。

避免死锁

什么情况会产生死锁?
在线程共享多个资源的时候,如果两个线程分别占有一部分资源,并且同时等待对方释放资源,就会造成死锁。
死锁是一种状态。
比如有两个锁,有两个线程,每个线程刚开始都上了各自一个锁,接下来却都想上另一个锁,那就必须等待对方释放资源,这样就造成了死锁。
为了避免出现这种情况,有两种方法

  • 添加超时等待
  • 程序设计避免-银行家算法

生产者消费者模式爬虫

  1. 多组件的Pipeline技术
  2. 生产者消费者架构
  3. 多线程数据通信的queue.Queue
import queue
import random
import time
import threading
import requests
from bs4 import BeautifulSoup

def craw(url):
    r = requests.get(url)
    return r.text

def parse(html):
    soup = BeautifulSoup(html, 'html.parser')
    links = soup.find_all('a', class_="post-item-title")
    return [(link["href"], link.get_text()) for link in links]
  
  
# 生产者
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
    while True:
        url = url_queue.get()
        html = craw(url)
        html_queue.put(html)
        print("生产者处理", f"当前线程名字{threading.current_thread().name}**当前html队列长度{html_queue.qsize()}")
        

# 消费者
def do_parse(html_queue: queue.Queue, fout):
    while True:
        html = html_queue.get()
        results = parse(html)
        for result in results:
            fout.write(str(result) + "\n")
        time.sleep(random.randint(1, 2))
        print("消费者处理", f"当前线程名字{threading.current_thread().name}---当前队列长度{html_queue.qsize()}")
        
        
if __name__ == '__main__':
    urls = [
        f'https://www.cnblogs.com/#/p{page}'
        for page in range(1, 50 + 1)
    ]
    html_queue = queue.Queue()
    url_queue = queue.Queue()
    for url in urls:
        url_queue.put(url)
    
    # 两个生产者线程
    for i in range(2):
        t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craw {i}")
        t.start()
    
    # 三个消费者线程
    fout = open('txt.txt', 'w')
    for j in range(3):
        t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse {i}")
        t.start()
0

评论区