99偷拍视频精品区一区二,口述久久久久久久久久久久,国产精品夫妇激情啪发布,成人永久免费网站在线观看,国产精品高清免费在线,青青草在线观看视频观看,久久久久久国产一区,天天婷婷久久18禁,日韩动漫av在线播放直播

python協程阻塞函數,python 非阻塞線程

python協程(4):asyncio

asyncio是官方提供的協程的類庫,從python3.4開始支持該模塊

創新互聯公司長期為上1000+客戶提供的網站建設服務,團隊從業經驗10年,關注不同地域、不同群體,并針對不同對象提供差異化的產品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯網生態環境。為水城企業提供專業的成都做網站、網站建設,水城網站改版等技術服務。擁有十余年豐富建站經驗和眾多成功案例,為您定制開發。

async awiat是python3.5中引入的關鍵字,使用async關鍵字可以將一個函數定義為協程函數,使用awiat關鍵字可以在遇到IO的時候掛起當前協程(也就是任務),去執行其他協程。

await + 可等待的對象(協程對象、Future對象、Task對象 - IO等待)

注意:在python3.4中是通過asyncio裝飾器定義協程,在python3.8中已經移除了asyncio裝飾器。

事件循環,可以把他當做是一個while循環,這個while循環在周期性的運行并執行一些協程(任務),在特定條件下終止循環。

loop = asyncio.get_event_loop():生成一個事件循環

loop.run_until_complete(任務):將任務放到事件循環

Tasks用于并發調度協程,通過asyncio.create_task(協程對象)的方式創建Task對象,這樣可以讓協程加入事件循環中等待被調度執行。除了使用 asyncio.create_task() 函數以外,還可以用低層級的 loop.create_task() 或 ensure_future() 函數。不建議手動實例化 Task 對象。

本質上是將協程對象封裝成task對象,并將協程立即加入事件循環,同時追蹤協程的狀態。

注意:asyncio.create_task() 函數在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用 asyncio.ensure_future() 函數。

下面結合async awiat、事件循環和Task看一個示例

示例一:

*注意:python 3.7以后增加了asyncio.run(協程對象),效果等同于loop = asyncio.get_event_loop(),loop.run_until_complete(協程對象) *

示例二:

注意:asyncio.wait 源碼內部會對列表中的每個協程執行ensure_future從而封裝為Task對象,所以在和wait配合使用時task_list的值為[func(),func()] 也是可以的。

示例三:

Python協程之asyncio

asyncio 是 Python 中的異步IO庫,用來編寫并發協程,適用于IO阻塞且需要大量并發的場景,例如爬蟲、文件讀寫。

asyncio 在 Python3.4 被引入,經過幾個版本的迭代,特性、語法糖均有了不同程度的改進,這也使得不同版本的 Python 在 asyncio 的用法上各不相同,顯得有些雜亂,以前使用的時候也是本著能用就行的原則,在寫法上走了一些彎路,現在對 Python3.7+ 和 Python3.6 中 asyncio 的用法做一個梳理,以便以后能更好的使用。

協程,又稱微線程,它不被操作系統內核所管理,而完全是由程序控制,協程切換花銷小,因而有更高的性能。

協程可以比作子程序,不同的是,執行過程中協程可以掛起當前狀態,轉而執行其他協程,在適當的時候返回來接著執行,協程間的切換不需要涉及任何系統調用或任何阻塞調用,完全由協程調度器進行調度。

Python 中以 asyncio 為依賴,使用 async/await 語法進行協程的創建和使用,如下 async 語法創建一個協程函數:

在協程中除了普通函數的功能外最主要的作用就是:使用 await 語法等待另一個協程結束,這將掛起當前協程,直到另一個協程產生結果再繼續執行:

asyncio.sleep() 是 asyncio 包內置的協程函數,這里模擬耗時的IO操作,上面這個協程執行到這一句會掛起當前協程而去執行其他協程,直到sleep結束,當有多個協程任務時,這種切換會讓它們的IO操作并行處理。

注意,執行一個協程函數并不會真正的運行它,而是會返回一個協程對象,要使協程真正的運行,需要將它們加入到事件循環中運行,官方建議 asyncio 程序應當有一個主入口協程,用來管理所有其他的協程任務:

在 Python3.7+ 中,運行這個 asyncio 程序只需要一句: asyncio.run(main()) ,而在 Python3.6 中,需要手動獲取事件循環并加入協程任務:

事件循環就是一個循環隊列,對其中的協程進行調度執行,當把一個協程加入循環,這個協程創建的其他協程都會自動加入到當前事件循環中。

其實協程對象也不是直接運行,而是被封裝成一個個待執行的 Task ,大多數情況下 asyncio 會幫我們進行封裝,我們也可以提前自行封裝 Task 來獲得對協程更多的控制權,注意,封裝 Task 需要 當前線程有正在運行的事件循環 ,否則將引 RuntimeError,這也就是官方建議使用主入口協程的原因,如果在主入口協程之外創建任務就需要先手動獲取事件循環然后使用底層方法 loop.create_task() ,而在主入口協程之內是一定有正在運行的循環的。任務創建后便有了狀態,可以查看運行情況,查看結果,取消任務等:

asyncio.create_task() 是 Python3.7 加入的高層級API,在 Python3.6,需要使用低層級API asyncio.ensure_future() 來創建 Future,Future 也是一個管理協程運行狀態的對象,與 Task 沒有本質上的區別。

通常,一個含有一系列并發協程的程序寫法如下(Python3.7+):

并發運行多個協程任務的關鍵就是 asyncio.gather(*tasks) ,它接受多個協程任務并將它們加入到事件循環,所有任務都運行完成后會返回結果列表,這里我們也沒有手動封裝 Task,因為 gather 函數會自動封裝。

并發運行還有另一個方法 asyncio.wait(tasks) ,它們的區別是:

Python異步編程4:協程函數,協程對象,await關鍵字

協程函數:async def?函數名。3.5+

協程對象:執行協程函數()得到的協程對象。

3.5之后的寫法:

3.7之后的寫法:更簡便

await后面?跟?可等待的對象。(協程對象,Future,Task對象?約等于IO等待)

await實例2:串行執行。 一個協程函數里面可以支持多個await ,雖然會串行,但是如果有其他協程函數,任務列表也在執行,依然會切換。只是案例中的main對應執行的others1和others2串行 。 await會等待對象的值得到之后才繼續往下走。

python協程和異步IO——IO多路復用

C10k是一個在1999年被提出來的技術挑戰,如何在一顆1GHz CPU,2G內存,1gbps網絡環境下,讓單臺服務器同時為1萬個客戶端提供FTP服務

阻塞式I/O(使用最多)、非阻塞式I/O、I/O復用、信號驅動式I/O(幾乎不使用)、異步I/O(POSIX的aio_系列函數)

select、poll、epoll都是IO多路復用的機制。I/O多路復用就是通過一種機制,一個進程可以監聽多個描述符,一旦,某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。但select、poll、epoll本質上都是同步I/O,因為他們都需要在讀寫時間就緒后負責進行讀寫,也就是說讀寫過程是阻塞的,而異步I/O無需自己負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間

(1)select

select函數監視的文件描述符分3類,分別是writefds、readfds、exceptfds。調用select函數會阻塞,直到有描述符就緒(有數據可讀、可寫或者有except),或者超時函數返回。當select函數返回后可以通過遍歷fdset來找到就緒的描述符。

select目前幾乎在所有的平臺上支持,其良好的跨平臺支持也是它的一個優點。select的一個缺點在于單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,可以通過修改宏定義甚至重新編譯內核的方式提升這一限制,但是這樣也會降低效率。

(2)poll

不同于select使用三個位圖來表示三個fdset的方式,poll使用一個pollfd的指針實現。

pollfd結構包含了要監視的event和發生的event,不再使用select"參數-值"傳遞的方式。同時pollfd并沒有最大數量限制(但是數量過大后性能也會下降)。和select函數一樣,poll返回后,需要輪詢pollfd來獲取就緒的描述符。

從上面看,select和poll都需要在返回后通過遍歷文件描述符來獲取已經就緒的socket。事實上同時連接的大量客戶端在同一時刻可能只有很少的處于就緒的狀態,因此隨著監視的描述符數量的增長,其效率也會線性下降

(3)epoll

epoll是在2.6內核中提出的,是之前的select和poll的增強版本。相對于select和poll來說,epoll更加領靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關系的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。

python里并發執行協程時部分阻塞超時怎么辦

在前面的例子里學習了并發地執行多個協程來下載圖片,也許其中一個協程永遠下載不了,一直阻塞,這時怎么辦呢?

碰到這種需求時不要驚慌,可以使用wait()里的timeout參數來設置等待時間,也就是從這個函數開始運行算起,如果時間到達協程沒有執行完成,就可以不再等它們了,直接從wait()函數里返回,返回之后就可以判斷那些沒有執行成功的,可以把這些協程取消掉。例子如下:

[python]?view plain?copy

import?asyncio

async?def?phase(i):

print('in?phase?{}'.format(i))

try:

await?asyncio.sleep(0.1?*?i)

except?asyncio.CancelledError:

print('phase?{}?canceled'.format(i))

raise

else:

print('done?with?phase?{}'.format(i))

return?'phase?{}?result'.format(i)

async?def?main(num_phases):

print('starting?main')

phases?=?[

phase(i)

for?i?in?range(num_phases)

]

print('waiting?0.1?for?phases?to?complete')

completed,?pending?=?await?asyncio.wait(phases,?timeout=0.1)

print('{}?completed?and?{}?pending'.format(

len(completed),?len(pending),

))

#?Cancel?remaining?tasks?so?they?do?not?generate?errors

#?as?we?exit?without?finishing?them.

if?pending:

print('canceling?tasks')

for?t?in?pending:

t.cancel()

print('exiting?main')

event_loop?=?asyncio.get_event_loop()

try:

event_loop.run_until_complete(main(3))

finally:

event_loop.close()

結果輸出如下:

starting main

waiting 0.1 for phases to complete

in phase 0

in phase 2

in phase 1

done with phase 0

1 completed and 2 pending

canceling tasks

exiting main

phase 1 canceled

phase 2 canceled

python2.7怎么實現異步

改進之前

之前,我的查詢步驟很簡單,就是:

前端提交查詢請求 -- 建立數據庫連接 -- 新建游標 -- 執行命令 -- 接受結果 -- 關閉游標、連接

這幾大步驟的順序執行。

這里面當然問題很大:

建立數據庫連接實際上就是新建一個套接字。這是進程間通信的幾種方法里,開銷最大的了。

在“執行命令”和“接受結果”兩個步驟中,線程在阻塞在數據庫內部的運行過程中,數據庫連接和游標都處于閑置狀態。

這樣一來,每一次查詢都要順序的新建數據庫連接,都要阻塞在數據庫返回結果的過程中。當前端提交大量查詢請求時,查詢效率肯定是很低的。

第一次改進

之前的模塊里,問題最大的就是第一步——建立數據庫連接套接字了。如果能夠一次性建立連接,之后查詢能夠反復服用這個連接就好了。

所以,首先應該把數據庫查詢模塊作為一個單獨的守護進程去執行,而前端app作為主進程響應用戶的點擊操作。那么兩條進程怎么傳遞消息呢?翻了幾天Python文檔,終于構思出來:用隊列queue作為生產者(web前端)向消費者(數據庫后端)傳遞任務的渠道。生產者,會與SQL命令一起,同時傳遞一個管道pipe的連接對象,作為任務完成后,回傳結果的渠道。確保,任務的接收方與發送方保持一致。

作為第二個問題的解決方法,可以使用線程池來并發獲取任務隊列中的task,然后執行命令并回傳結果。

第二次改進

第一次改進的效果還是很明顯的,不用任何測試手段。直接點擊頁面鏈接,可以很直觀地感覺到反應速度有很明顯的加快。

但是對于第二個問題,使用線程池還是有些欠妥當。因為,CPython解釋器存在GIL問題,所有線程實際上都在一個解釋器進程里調度。線程稍微開多一點,解釋器進程就會頻繁的切換線程,而線程切換的開銷也不小。線程多一點,甚至會出現“抖動”問題(也就是剛剛喚醒一個線程,就進入掛起狀態,剛剛換到棧幀或內存的上下文,又被換回內存或者磁盤),效率大大降低。也就是說,線程池的并發量很有限。

試過了多進程、多線程,只能在單個線程里做文章了。

Python中的asyncio庫

Python里有大量的協程庫可以實現單線程內的并發操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio庫同樣可以實現協程并發。asyncio庫大大降低了Python中協程的實現難度,就像定義普通函數那樣就可以了,只是要在def前面多加一個async關鍵詞。async def函數中,需要阻塞在其他async def函數的位置前面可以加上await關鍵詞。

import asyncio

async def wait():

await asyncio.sleep(2)

async def execute(task):

process_task(task)

await wait()

continue_job()

async def函數的執行稍微麻煩點。需要首先獲取一個loop對象,然后由這個對象代為執行async def函數。

loop = asyncio.get_event_loop()

loop.run_until_complete(execute(task))

loop.close()

loop在執行execute(task)函數時,如果遇到await關鍵字,就會暫時掛起當前協程,轉而去執行其他阻塞在await關鍵詞的協程,從而實現協程并發。

不過需要注意的是,run_until_complete()函數本身是一個阻塞函數。也就是說,當前線程會等候一個run_until_complete()函數執行完畢之后,才會繼續執行下一部函數。所以下面這段代碼并不能并發執行。

for task in task_list:

loop.run_until_complete(task)

對與這個問題,asyncio庫也有相應的解決方案:gather函數。

loop = asyncio.get_event_loop()

tasks = [asyncio.ensure_future(execute(task))

for task in task_list]

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

當然了,async def函數的執行并不只有這兩種解決方案,還有call_soon與run_forever的配合執行等等,更多內容還請參考官方文檔。

Python下的I/O多路復用

協程,實際上,也存在上下文切換,只不過開銷很輕微。而I/O多路復用則完全不存在這個問題。

目前,Linux上比較火的I/O多路復用API要算epoll了。Tornado,就是通過調用C語言封裝的epoll庫,成功解決了C10K問題(當然還有Pypy的功勞)。

在Linux里查文檔,可以看到epoll只有三類函數,調用起來比較方便易懂。

創建epoll對象,并返回其對應的文件描述符(file descriptor)。

int epoll_create(int size);

int epoll_create1(int flags);

控制監聽事件。第一個參數epfd就對應于前面命令創建的epoll對象的文件描述符;第二個參數表示該命令要執行的動作:監聽事件的新增、修改或者刪除;第三個參數,是要監聽的文件對應的描述符;第四個,代表要監聽的事件。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

等候。這是一個阻塞函數,調用者會等候內核通知所注冊的事件被觸發。

int epoll_wait(int epfd, struct epoll_event *events,

int maxevents, int timeout);

int epoll_pwait(int epfd, struct epoll_event *events,

int maxevents, int timeout,

const sigset_t *sigmask);

在Python的select庫里:

select.epoll()對應于第一類創建函數;

epoll.register(),epoll.unregister(),epoll.modify()均是對控制函數epoll_ctl的封裝;

epoll.poll()則是對等候函數epoll_wait的封裝。

Python里epoll相關API的最大問題應該是在epoll.poll()。相比于其所封裝的epoll_wait,用戶無法手動指定要等候的事件,也就是后者的第二個參數struct epoll_event *events。沒法實現精確控制。因此只能使用替代方案:select.select()函數。

根據Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對Unix系統中select函數的直接調用,與C語言API的傳參很接近。前三個參數都是列表,其中的元素都是要注冊到內核的文件描述符。如果想用自定義類,就要確保實現了fileno()方法。

其分別對應于:

rlist: 等候直到可讀

wlist: 等候直到可寫

xlist: 等候直到異常。這個異常的定義,要查看系統文檔。

select.select(),類似于epoll.poll(),先注冊文件和事件,然后保持等候內核通知,是阻塞函數。

實際應用

Psycopg2庫支持對異步和協程,但和一般情況下的用法略有區別。普通數據庫連接支持不同線程中的不同游標并發查詢;而異步連接則不支持不同游標的同時查詢。所以異步連接的不同游標之間必須使用I/O復用方法來協調調度。

所以,我的大致實現思路是這樣的:首先并發執行大量協程,從任務隊列中提取任務,再向連接池請求連接,創建游標,然后執行命令,并返回結果。在獲取游標和接受查詢結果之前,均要阻塞等候內核通知連接可用。

其中,連接池返回連接時,會根據引用連接的協程數量,返回負載最輕的連接。這也是自己定義AsyncConnectionPool類的目的。

我的代碼位于:bottle-blog/dbservice.py

存在問題

當然了,這個流程目前還一些問題。

首先就是每次輪詢拿到任務之后,都會走這么一個流程。

獲取連接 -- 新建游標 -- 執行任務 -- 關閉游標 -- 取消連接引用

本來,最好的情況應該是:在輪詢之前,就建好游標;在輪詢時,直接等候內核通知,執行相應任務。這樣可以減少輪詢時的任務量。但是如果協程提前對應好連接,那就不能保證在獲取任務時,保持各連接負載均衡了。

所以這一塊,還有工作要做。

還有就是epoll沒能用上,有些遺憾。

以后打算寫點C語言的內容,或者用Python/C API,或者用Ctypes包裝共享庫,來實現epoll的調用。

最后,請允許我吐槽一下Python的epoll相關文檔:簡直太弱了!!!必須看源碼才能弄清楚功能。

本文名稱:python協程阻塞函數,python 非阻塞線程
文章轉載:http://www.yijiale78.com/article16/hsehgg.html

成都網站建設公司_創新互聯,為您提供品牌網站設計定制開發網站排名外貿網站建設App開發

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

微信小程序開發