Python多段程和多过程(一) GIL锁和应用Thread建立

摘要: 所属部位: > Python > Python多段程和多过程(一) GIL锁和应用Thread建立多段程 Python中的GIL锁GIL 全局性表述器锁python中一个进程相匹配于c語言中的一个进程。GIL锁是“一个过程有且唯一一...

所属部位: > Python > Python多段程和多过程(一) GIL锁和应用Thread建立多段程

Python中的GIL锁

GIL 全局性表述器锁

python中一个进程相匹配于c語言中的一个进程。

GIL锁是“一个过程有且唯一一个的锁,该锁用以操纵多段程同一時刻只有有一个进程应用CPU”

GIL促使同一時刻,仅有一个进程运作在一个CPU上。这寓意着一个过程中有个进程只有采用一个CPU而没法将好几个进程投射到好几个CPU上(即便你的电脑上有多核CPU),因此这一过程中的多段程是高并发的而并不是并行处理的(即同一時刻仅有一个进程在运作)。

自然,GIL的存有不寓意着大家不用开展进程间的同歩,由于即便是单核测算机退出程的高并发也会导致資源的市场竞争。

GIL锁功效的全部全过程是那样的:
一个进程A要想应用CPU来实行程序就需要先取得过程中的GIL锁,取得GIL锁的进程才可以够实行,别的进程没法实行。
当A实行到時间片完毕或是即便時间片沒有完毕可是A碰到堵塞(如IO实际操作,或是等候别的进程互斥锁的释放出来等状况),A便会释放出来GIL锁,让另外一个进程B占据这一锁,那样进程B便可以占据CPU并运作。
根据所述方法,好几个进程间轮着获得GIL锁和CPU高并发的运作(能够了解为CPU是被GIL锁定的,假如进程B要应用CPU务必等进程A释放出来GIL锁才可以应用CPU)。

因而,GIL锁的释放出来只需考虑下列2个标准中的一个:
1.进程的時间片应用结束(或是运作完一定行数的字节数码)
2.进程碰到堵塞/等候的情况,这时即便時间片沒有用完也会释放出来GIL锁

   
一些人要有一个错误观念,觉得一个进程彻底实行完才会释放出来GIL锁给别的进程实行。那样是错的,那样多段程也不是高并发只是串行通信了。


==================================================

多段程程序编写

下边以网络爬虫做为事例,网络爬虫是一个很合适应用多段程进行的每日任务1. 根据Thread类+涵数建立进程

# coding=utf-8
from threading import Thread
from time import sleep,time
def get_detail_url():

    sleep(1)    # 根据sleep仿真模拟抓取全过程

def get_detail_content():

    sleep(2)    # 根据sleep仿真模拟抓取全过程

if __name__ == "__main__":
    st = time()
    print("主进程承担记时")
    t1 = Thread(target=get_detail_url)
    t2 = Thread(target=get_detail_content)
    t1.start()
    t2.start()
    et = time()

    st = time()
    print("主进程承担记时")
    t1 = Thread(target=get_detail_url)
    t2 = Thread(target=get_detail_content)
    t1.start()
    t2.start()
   
    t1.join()
    t2.join()   # 会等候t1,t2运作完
   
    et = time()

    st = time()
    print("主进程承担记时")
    t1 = Thread(target=get_detail_url)
    t2 = Thread(target=get_detail_content)
    t1.setDaemon(True)      # 设成守卫进程
    t2.setDaemon(True)      # 设成守卫进程
    t1.start()
    t2.start()
    et = time()
    print("每日任务完毕,用时:%.2f" % (et-st))

from threading import Thread
from time import sleep,time
class GetDetailUrl(Thread):
    def __init__(self,name):
        super(GetDetailUrl,self).__init__()
        self.name=name
    def run(self):

        sleep(1)    # 根据sleep仿真模拟抓取全过程

class GetDetailContent(Thread):
    def __init__(self,name):
        super(GetDetailContent,self).__init__()
        self.name=name
    def run(self):

        sleep(2)    # 根据sleep仿真模拟抓取全过程

if __name__ == "__main__":
    st = time()
    print("主进程承担记时")
    t1 = GetDetailUrl(name="get_detail_url")
    t2 = GetDetailContent(name="get_detail_content")
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    et = time()

from threading import Thread
from time import sleep,time

is_finished = False    
def get_detail_content(urls,name):
    global is_finished

    while not is_finished or len(urls): # 假如生产制造者沒有生产制造完或是生产制造者生产制造完后但消費者沒有消費完就从urls取下url开展抓取
        try:
            url = urls.pop()
            sleep(0.001)    # 抓取一个內容页花0.00一秒

        except:     # 这儿是以便避免消費者消費太快,当生产制造者仍在生产制造但urls原素为0时,pop会出错。这时应当再次分辨urls中是不是有原素
            continue

def get_detail_url(urls):
    global is_finished


    page_url_num = 100
    start_id = 1
    while start_id 10000:
        end_id = start_id+page_url_num
        for id in range(start_id,end_id): # 假定有100页目录页,每一页有一百个url,共10000个url
&" % id
            urls.append(url)
        sleep(0.01)    # 爬一个目录页花0.0一秒
        start_id = end_id
    is_finished=True

if __name__ == "__main__":
    st = time()
    print("主进程承担记时")
    # 建立一个生产制造者进程
    producer = Thread(target=get_detail_url,args=(urls,))
    consumers = []
    for i in range(3):  # 建立3个消費者进程
        name="Thread %d" % (i+1)
        consumer = Thread(target=get_detail_content,args=(urls,name))
        consumers.append(consumer)

    producer.start()     sleep(0.1)  # 睡0.一秒是以便让生产制造者老先生产些连接到urls中     for consumer in consumers:         consumer.start()     producer.join()     for consumer in consumers:         consumer.join()     et = time()     print("每日任务完毕,用时:%.2f" % (et-st))


   
上边的程序中有生产制造者 get_detail_url, 也有消費者 get_detail_content
生产制造者承担往 urls 中加上原素,消費者承担从urls取下原素并开展抓取內容,根据这类方法开展协作,协作的媒体便是urls这一共享资源自变量。

共享资源自变量有2个 
urls :储放商品的器皿
is_finished :分辨生产制造者是不是转化成结束

上边生产制造者的转化成速率是一个消費者消費商品速率的10倍。因此假如将消費者进程从3个跳至10个,能够提升高效率,从16~18秒减缩到4~6秒。


尽管共享资源自变量能够完成进程间通讯,可是这类应用共享资源自变量通讯的方法是进程躁动不安全的。缘故非常简单,生产制造者和3个消費者这4个进程相互市场竞争应用urls这一个資源,极可能会造成urls被改乱导致数据信息不一致的难题。結果便是,消費者将会反复消費同一个商品或是一些商品未能append到urls中。

因此一般共享资源自变量要融合锁来确保进程市场竞争的应用这一資源自变量时是安全性井然有序的。



协作方法2:应用Queue序列

Queue对比于一般的list构造来讲,Queue是进程安全性的,而list并不是进程安全性的。缘故是Queue內部应用了锁和标准自变量来开展进程同歩,可是list沒有采用进程同歩技术性。
 

# coding=utf-8
from threading import Thread
from time import sleep,time
from queue import Queue

def get_detail_content(urls,name):

    while True:
        url = urls.get()    # 当序列为空时,get会堵塞进程,进程进到休眠状态,直至序列有原素才会被唤起
        sleep(0.001)    # 抓取一个url花0.00一秒


def get_detail_url(urls):


    page_url_num = 100
    start_id = 1
    while start_id 10000:
        end_id = start_id+page_url_num
        for id in range(start_id,end_id): # 假定有100页目录页,每一页有一百个url,共10000个url
&" % id
            urls.put(url)   # 假如序列满了,put会堵塞进程,进程进到休眠状态,直至序列有时间间了,进程才被唤起
        sleep(0.01)    # 爬一个目录页花0.0一秒
        start_id = end_id

if __name__ == "__main__":
    st = time()
    print("主进程承担记时")
    # 建立一个生产制造者进程
    producer = Thread(target=get_detail_url,args=(urls,))
    consumers = []
    for i in range(10):  # 建立10个消費者进程
        name="Thread %d" % (i+1)
        consumer = Thread(target=get_detail_content,args=(urls,name))
        consumers.append(consumer)

    print("每日任务完毕,用时:%.2f" % (et-st))


这一事例对比于以前的应用非进程安全性的共享资源自变量来讲,queue是确保了进程安全性的。

可是这一事例有一个小缺点:当全部每日任务消費完以后,全部消費者进程都会实行urls.get()时进到堵塞。
这就造成主进程一直等候消費者进程完毕。
因此全部过程没法完毕,都不能复印担任务实行的時间。


大家能够略微改善一下:
构思以下:
1.设定过程不一待消費者进程和生产制造者进程实行完毕而完毕,因此对消費者和生产制造者进程应用 setDaemon(True) 设定为守卫进程
2.过程必须等候每日任务序列中的每日任务强制执行完才完毕,而且复印担任务用时,这儿可使用 Queue的join()方式

以下:

# coding=utf-8
from threading import Thread
from time import sleep,time
from queue import Queue
urls = Queue(500)  
#is_finished = False
def get_detail_content(urls,name):

    while True:
        url = urls.get()    
        sleep(0.001)    
        urls.task_done()        # 标识这一次取下来的url每日任务早已实行完


def get_detail_url(urls):


    page_url_num = 100
    start_id = 1
    while start_id 10000:
        end_id = start_id+page_url_num
        for id in range(start_id,end_id): 
&" % id
            urls.put(url)  
        sleep(0.01)    
        start_id = end_id

if __name__ == "__main__":
    st = time()
    print("主进程承担记时")
    producer = Thread(target=get_detail_url,args=(urls,))
    producer.setDaemon(True)    # 设定为守卫进程
    consumers = []
    for i in range(10):  
        name="Thread %d" % (i+1)
        consumer = Thread(target=get_detail_content,args=(urls,name))
        consumer.setDaemon(True)    # 设定为守卫进程
        consumers.append(consumer)

    urls.join()     # 等候urls序列的每日任务强制执行完才向下实行         et = time()     print("每日任务完毕,用时:%.2f" % (et-st))

注解的地区便是干了改动的地区。

PS: Queue的join()方式务必相互配合task_done()方式一起应用!

Queue的join方式的唤起标准:
1当序列中常有每日任务被弹出来,序列中原素为0
2.每一个被弹出来的每日任务都实行了task_done()来标识这一每日任务已被进行

2个标准缺一不能

上边程序全过程以下: 
最先,全部进程刚开始运作,同时主进程被urls.join()堵塞
消費者不断消費商品,每消費完一个商品都是实行task_done()标识每一个每日任务完成
当消費者消費完全部每日任务,而且实行完全部每日任务,urls.join()会被唤起,同时需有消費者进程会实行到 urls.get()被堵塞。可是因为过程不一待这种守卫进程,因此主进程完毕了,全部进程都立即完毕。


上边这类应用Queue的join()方式只可用于生产制造者生产制造速率远大于消費者消費速率。倘若我将page_url_num = 100改成page_url_num = 5
生产制造速率从本来的0.0一秒100条变成0.0一秒5条,这一情况下10个消費者进程会立刻把序列中常有每日任务消費完而且每一个每日任务都用task_done标识为完成的每日任务,因此urls.join()被唤起,随后主进程完毕,生产制造者和消費者进程也完毕,可是生产制造者的10000个每日任务还没有有生产制造完呢!

以便处理这一难题,能够在生产制造者进程中做一个is_finished的全局性自变量标识,表明生产制造者是不是转化成结束。假如沒有生产制造结束,即便消費者进行了目前序列的全部每日任务,也会反复启用join()

# coding=utf-8
from threading import Thread
from time import sleep,time
from queue import Queue
urls = Queue(500)
is_finished = False     ####################
def get_detail_content(urls,name):

    while True:
        url = urls.get()
        sleep(0.001)
        urls.task_done()        # 标识这一次取下来的url每日任务早已实行完


def get_detail_url(urls):
    global is_finished      ####################


    page_url_num = 10
    start_id = 1
    while start_id 10000:
        end_id = start_id+page_url_num
        for id in range(start_id,end_id):
&" % id
            urls.put(url)
        sleep(0.01)
        start_id = end_id
       
    is_finished=True        ####################

if __name__ == "__main__":
    st = time()
    print("主进程承担记时")
    producer = Thread(target=get_detail_url,args=(urls,))
    producer.setDaemon(True)    # 设定为守卫进程
    consumers = []
    for i in range(10):
        name="Thread %d" % (i+1)
        consumer = Thread(target=get_detail_content,args=(urls,name))
        consumer.setDaemon(True)    # 设定为守卫进程
        consumers.append(consumer)

    while not is_finished:      ######################         urls.join()     # 等候urls序列的每日任务强制执行完才向下实行     et = time()     print("每日任务完毕,用时:%.2f" % (et-st))


   
标识了 #################### 的地区是作出变动的地区
下边贴出来Queue中 join和task_done 的源代码:

class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0
     
    def task_done(self):
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished = 0:
                if unfinished 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished
    def join(self):
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

               
join是根据标准自变量完成的。
当 self.unfinished_tasks 超过0时便会进到堵塞。而 unfinished_tasks 会在启用put() 给序列加上原素时开展+1 实际操作。在启用 task_done() 时开展-1实际操作并检验 unfinished_tasks 是不是为0 ,假如为0就唤起join中的标准自变量。

编码段 小构件

张柏沛IT技术性blog > Python多段程和多过程(一) GIL锁和应用Thread建立多段程

点一下拷贝转截该一篇文章



联系我们

全国服务热线:4000-399-000 公司邮箱:343111187@qq.com

  工作日 9:00-18:00

关注我们

官网公众号

官网公众号

Copyright?2020 广州凡科互联网科技股份有限公司 版权所有 粤ICP备10235580号 客服热线 18720358503

技术支持:游戏抽奖