MENU

协程与asyncio

• April 8, 2019 • Read: 97 • python

最近搞了两天asyncio这个,感觉以后异步在python中只会越来越实用。但是,好难理解啊。

明明在学习的时候有很多不明白的地方,然后想写出来的时候又突然不知道写啥了。感觉吧,也就那么点东西,多练习一下就好了。
首先看的这个:http://python.jobbole.com/88291/ 。照葫芦画瓢撸了几行代码,前边的能看懂,yield一出来,感觉我的逻辑立马跟不上了。但是,前半部分理论知识讲的很好,起码能打好理论基础。
然后在imooc看的bobby老师的视频课,官方链接:https://coding.imooc.com/class/200.html 。闲鱼上话5块钱就能买到网盘版的2333。感觉bobby老师编程能力超级强,但是讲课水平真的一般,动不动就看源码,分析内存,入门小白谁扛得住啊。
imooc上bobby老师的那一门课就是照着《流畅的python》这本书讲的,然后我又找到了这本书的pdf版看了一下,好吧是我太菜了,看不太懂。我分享一下:链接:https://share.weiyun.com/5cEDIP0
入门还是得看廖雪峰,廖老师都把肉送到嘴边了2333 ,点此直达
还有之前说过的两个asyncio入门比较给力的:
手把手教你如何使用Python的异步IO框架:asyncio(上)
手把手教你如何使用Python的异步IO框架:asyncio(中)
感觉进程池和线程池还是需要再学习一下。

然后贴一波代码记录一下:

同步阻塞

import socket
import time
import concurrent.futures
def blocking_way():
    #创建套接字
    sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #发起链接请求
    sock.connect(('111.231.102.149',20000))
    #发送数据 windows下编码格式为gbk
    request='GET /indexx.html HTTP/1.0 \n  user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36'.encode('gbk')
    sock.send(request)
    #从sock上读取1K数据
    chunk=sock.recv(1024)
    respond=''
    while chunk:
        respond+=chunk.decode('gbk')
        chunk=sock.recv(1024)
    sock.close()
    return  respond
#同步阻塞
def sync_way():
    start_time_2 = time.time()
    res=[]
    for i in range(10):
        res.append(blocking_way())
        print('-'*20)
    return str(time.time() - start_time_2)
#多进程
def process_way():
    workers=10
    #这里使用concurrent.futures模块
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(workers) as executor:
        futures = [executor.submit(blocking_way) for item in range(10)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    return str(time.time() - start_time_2)
if __name__ == '__main__':
    # result=sync_way()
    result=process_way()
    print(result)

非阻塞

import socket
import time
import concurrent.futures
def blocking_way():
    #创建套接字
    sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #设置非阻塞
    sock.setblocking(False)
    #发起链接请求
    try:
        #非阻塞模式下connect会产生BlockingIOError异常
        sock.connect(('111.231.102.149',20000))
    except:
        pass
    #发送数据 windows下编码格式为gbk
    request='GET /indexx.html HTTP/1.0 \n  user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36'.encode('gbk')
    #在非阻塞模式下,调用API 后,例如send() 或recv()方法,如果遇到问题就会抛出异常。在阻塞模式下,遇到错误并不会阻止操作。
    while True:
    #sock.send(request)
    #抛出OSError
        try:
            sock.send(request)
            break
        except:
            #如果不再抛出异常,则表明发送完成
            pass
    #从sock上读取1K数据
    try:
        chunk=sock.recv(1024)
    except:
        chunk=''
    respond=''
    while chunk:
        respond+=chunk.decode('gbk')
        try:
            chunk=sock.recv(1024)
        except:
            chunk=b''
    sock.close()
    return  respond

def sync_way():
    start_time_2 = time.time()
    res=[]
    for i in range(10):
        res.append(blocking_way())
        print('-'*20)
    return str(time.time() - start_time_2)
if __name__ == '__main__':
    print(sync_way())

io复用

#io复用
import socket
import selectors
selector=selectors.DefaultSelector()
stopped=True

class crawler(object):
    def __init__(self,tmp):
        self.tmp=tmp
        self.sock=None
        self.response=''
    def con_and_reg(self):
        print('con_and_reg')
        self.sock=socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('111.231.102.149',20000))
        except BlockingIOError:
            pass

        #注册到epoll里,等待连接成功(链接成功之后 可以写入 数据到共享虚拟内存里)
        selector.register(self.sock.fileno(),selectors.EVENT_WRITE,self.connnected)

    def connnected(self,key_obj,event_mask):
        print('connected')
        print(key_obj)
        selector.unregister(key_obj.fd)
        request = 'GET /indexx.html HTTP/1.0 \n  user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36'.encode('gbk')
        self.sock.send(request)
        #可以读数据了(客户端已收到响应)
        selector.register(key_obj.fd,selectors.EVENT_READ,self.read_response)
        print('注册read_response')
    def read_response(self,key_obj,event_mask):
        print('read_response')
        print(key_obj)
        global  stopped
        chunk=self.sock.recv(1024)
        print('chunk')
        if chunk:
            self.response+=chunk.decode('gbk')
        else:
            selector.unregister(key_obj.fd)
            print('1'*20)
            print(self.response)
            print('2'*20)
            if self.tmp ==9:
                stopped=False


def event_loop():
    print('启用监听')
    while stopped:
        events=selector.select()
        for key_obj,events_mask in events:
            callback_func=key_obj.data
            callback_func(key_obj,events_mask)

if __name__ == '__main__':
    import time
    start_time_2 = time.time()
    for item in range(10):
        print(item)
        c=crawler(item)
        c.con_and_reg()

    event_loop()
    print(str(time.time() - start_time_2))

协程

import socket
import selectors
selector=selectors.DefaultSelector()
stopped=True
class Future(object):
    def __init__(self):
        self.result=None
        self._callback=[]
    def add_done_callback(self,func):
        self._callback.append(func)
    def set_result(self,result):
        self.result=result
        for func in self._callback:
            func()
class crawler(object):
    def __init__(self,tmp):
        self.tmp=tmp
        self.sock=None
        self.response=''
    def con_and_reg(self):
        print('con_and_reg')
        self.sock=socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('111.231.102.149',20000))
        except BlockingIOError:
            pass
        f=Future()
        #注册到epoll里,等待连接成功(链接成功之后 可以写入 数据到共享虚拟内存里)
        selector.register(self.sock.fileno(),selectors.EVENT_WRITE,self.connnected)

    def connnected(self,key_obj,event_mask):
        print('connected')
        print(key_obj)
        selector.unregister(key_obj.fd)
        request = 'GET /indexx.html HTTP/1.0 \n  user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36'.encode('gbk')
        self.sock.send(request)
        #可以读数据了(客户端已收到响应)
        selector.register(key_obj.fd,selectors.EVENT_READ,self.read_response)
        print('注册read_response')
    def read_response(self,key_obj,event_mask):
        print('read_response')
        print(key_obj)
        global  stopped
        chunk=self.sock.recv(1024)
        print('chunk')
        if chunk:
            self.response+=chunk.decode('gbk')
        else:
            selector.unregister(key_obj.fd)
            print('1'*20)
            print(self.response)
            print('2'*20)
            if self.tmp ==9:
                stopped=False

class Task(object):
    def __init__(self,coro):
        self.coro=coro


def event_loop():
    print('启用监听')
    while stopped:
        events=selector.select()
        for key_obj,events_mask in events:
            callback_func=key_obj.data
            callback_func(key_obj,events_mask)

if __name__ == '__main__':
    import time
    start_time_2 = time.time()
    for item in range(10):
        print(item)
        c=crawler(item)
        c.con_and_reg()

    event_loop()
    print(str(time.time() - start_time_2))

迭代器

    from collections.abc import Iterable,Iterator
    class Company(object):
        def __init__(self,employee_list):
            self.employee_list=employee_list
        def __getitem__(self, item):
            return self.employee_list[item]
    c=Company([1,2,3,4,5])
    
    print(isinstance(c,Iterator))
    print(isinstance(c,Iterable))
    
    for item in c:
        print(item)
    
    i=0
    while True:
        try:
            print(c.__getitem__(i))
        except IndexError:
            pass
        i=i+1
####迭代器进阶

from collections.abc import Iterable,Iterator
# class Company(object):
#     def __init__(self,employee_list):
#         self.employee_list=employee_list
#     def __iter__(self):
#         return MyIterator(self.employee_list)
# class MyIterator(object):
#     def __init__(self,employee_list):
#         self.employee_list=employee_list
#         self.index=0
#     def __next__(self):
#         try:
#             word=self.employee_list[self.index]
#         except IndexError:
#             raise StopIteration
#         self.index+=1
#         return word
# c=Company([1,2,3,4,5])
# print(isinstance(c,Iterator))
# print(isinstance(c,Iterable))
# for item in c:
#     print(item)
# I=MyIterator([1,2,3,4,5])
# print(isinstance(I,Iterable))
# print(isinstance(I,Iterator))
# c=iter(c)
# while True:
#     try:
#         print(next(c))
#     except StopIteration:
#         pass
class True_Iterator(object):
    def __init__(self,arg_list):
        self.arg_list=arg_list
        self.index=0
    def __iter__(self):
        return self
    def __next__(self):
        try:
            word=self.arg_list[self.index]
        except IndexError:
            pass
        self.index+=1
        return word
c=True_Iterator([1,2,3,4,5])
print(isinstance(c,Iterator))
print(isinstance(c,Iterable))
for item in c:
    print(item)

asyncio

import asyncio
async def tcp_demo(loop):
    print('demo   conn')
    reader,writer=await asyncio.open_connection('111.231.102.149',20000,loop=loop)
    print('demo   conn  end')
    request = 'GET /indexx.html HTTP/1.0 \n  user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36'.encode(
        'gbk')
    writer.write(request)
    print('demo data')
    data = await reader.read(1024)
    print('demo data end')
    #print(data.decode('gbk'),'demo')
    writer.close()
    await tcp_demo1(loop)

async def tcp_demo1(loop):
    print('demo1 conn')
    reader,writer=await asyncio.open_connection('111.231.102.149',20000,loop=loop)
    print('demo1 conn end')
    request = 'GET /indexx.html HTTP/1.0 \n  user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36'.encode(
        'gbk')
    writer.write(request)
    print('demo1 data')
    data = await reader.read(1024)
    print('demo1 data end')
    #print(data.decode('gbk'),'demo1')
    writer.close()
@asyncio.coroutine
def tcp_demo2(loop):
    print('demo2 conn')
    reader,writer=yield from asyncio.open_connection('111.231.102.149',20000,loop=loop)
    print('demo2 conn end')
    request = 'GET /indexx.html HTTP/1.0 \n  user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36'.encode(
        'gbk')
    writer.write(request)
    print('demo2 data')
    data = yield from reader.read(1024)
    print('demo2 data end')
    #print(data.decode('gbk'),'demo2')
    writer.close()
    yield from tcp_demo3(loop)
def tcp_demo3(loop):
    print('demo3 conn')
    reader,writer=yield from asyncio.open_connection('111.231.102.149',20000,loop=loop)
    print('demo3 conn end')
    request = 'GET /indexx.html HTTP/1.0 \n  user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.75 Safari/537.36'.encode(
        'gbk')
    writer.write(request)
    print('demo3 data')
    data = yield from reader.read(1024)
    print('demo3 data end')
    #print(data.decode('gbk'),'demo3')
    writer.close()
loop = asyncio.get_event_loop()
tasks = [tcp_demo(loop), tcp_demo2(loop)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Tags: python
Archives QR Code Tip
QR Code for this page
Tipping QR Code
Leave a Comment