18.5.5. Streams(基于协程的API)¶
18.5.5.1. Stream的函数¶
None
本模块中的顶层函数仅用作方便包装;有真正没有什么特别的,如果他们不做你想要的,随时复制他们的代码。
- coroutine
asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, **kwds)¶ create_connection()
的包装器返回(rader,writer)对。reader返回的是一个
StreamReader
实例;writer是一个StreamWriter
实例。这些参数是
AbstractEventLoop.create_connection()
的所有常见参数,除了protocol_factory;最常见的是位置主机和端口,各种可选的关键字参数如下。其他可选的关键字参数是loop(设置事件循环实例使用)和limit(设置传递给
StreamReader
的缓冲区限制) 。此函数是coroutine。
- coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, loop=None, limit=None, **kwds)¶ 启动一个socket服务器,为每个连接的客户端产生一个回调。返回值与
create_server()
相同。使用两个参数调用client_connected_cb参数:client_reader,client_writer。client_reader是
StreamReader
对象,而client_writer是StreamWriter
对象。client_connected_cb参数可以是普通回调函数或协程函数;如果它是协程功能,它将被自动转换为任务
。其余的参数是
create_server()
的所有常见参数,除了protocol_factory;最常见的是位置主机和端口,各种可选的关键字参数如下。其他可选的关键字参数是loop(设置事件循环实例使用)和limit(设置传递给
StreamReader
的缓冲区限制) 。此函数是协程。
- coroutine
asyncio.
open_unix_connection
(path=None, *, loop=None, limit=None, **kwds)¶ create_unix_connection()
的包装器返回(读取器,写入器)对。有关返回值和其他详细信息的信息,请参见
open_connection()
。此函数是coroutine。
None
- coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, loop=None, limit=None, **kwds)¶ 启动UNIX Domain Socket服务器,为每个连接的客户端回调一个。
有关返回值和其他详细信息的信息,请参见
start_server()
。此函数是coroutine。
可用性:UNIX。
18.5.5.2. StreamReader ¶
- class
asyncio.
StreamReader
(limit=None, loop=None)¶ 此类为不是线程安全的。
-
exception
()¶ None
-
feed_eof
()¶ 确认EOF。
-
feed_data
(data)¶ 在内部缓冲区中输入数据字节。将恢复等待数据的任何操作。
-
set_exception
(exc)¶ None
-
set_transport
(transport)¶ 设置传输。
- coroutine
read
(n=-1)¶ 读取最多n个字节。如果未提供n或设置为
-1
,请读取直到EOF并返回所有读取的字节。如果接收到EOF并且内部缓冲区为空,则返回一个空的
bytes
对象。此方法是coroutine。
- coroutine
readline
()¶ 读取一行,其中“line”是以
\n
结尾的字节序列。如果接收到EOF,并且未找到
\n
,则该方法将返回部分读取字节。如果接收到EOF并且内部缓冲区为空,则返回一个空的
bytes
对象。此方法是coroutine。
- coroutine
readexactly
(n)¶ 读取n字节。如果读取n之前到达流的末尾,则引发
IncompleteReadError
,异常的IncompleteReadError.partial
属性包含部分读字节。此方法是协程。
- coroutine
readuntil
(separator=b'n')¶ 从流中读取数据,直到找到
separator
。成功时,数据和分隔符将从内部缓冲区(消耗)中删除。返回的数据将包括末尾的分隔符。
配置流限制用于检查结果。Limit设置可以返回的数据的最大长度,不计算分隔符。
如果发生EOF并且仍未找到完整的分隔符,则会引发
IncompleteReadError
异常,并且内部缓冲区将被重置。IncompleteReadError.partial
属性可能包含部分分隔符。如果由于超限而无法读取数据,则会引发
LimitOverrunError
异常,并且数据将保留在内部缓冲区中,因此可以再次读取。版本3.5.2中的新功能。
-
at_eof
()¶ 如果缓冲区为空且调用
feed_eof()
,则返回True
。
-
18.5.5.3. StreamWriter¶
- class
asyncio.
StreamWriter
(transport, protocol, reader, loop)¶ 包装运输。
这会使
write()
,writelines()
,can_write_eof()
,write_eof()
,get_extra_info()
和close()
。它添加drain()
,它返回可选的Future
,您可以等待流控制。它还添加了直接引用Transport
的传输属性。此类为not thread safe。
-
transport
¶ 运输。
-
can_write_eof
()¶ 如果传输支持
write_eof()
,False
,则返回True
请参见WriteTransport.can_write_eof()
。
-
close
()¶ 关闭传输:请参阅
BaseTransport.close()
。
- coroutine
drain
()¶ 让底层传输的写缓冲区有机会被刷新。
目的用途是写:
w.write(data) yield from w.drain()
当传输缓冲区的大小达到高水限值(协议暂停)时,阻塞直到缓冲区的大小下降到低水限值,并且协议恢复。当没有什么要等待,收益率继续立即。
从
drain()
得到的结果为循环提供了调度写入操作和刷新缓冲区的机会。尤其应当在可能大量的数据写入传输时使用,而协程不会从write()
的调用之间产生。此方法是coroutine。
-
get_extra_info
(name, default=None)¶ 返回可选的传输信息:请参阅
BaseTransport.get_extra_info()
。
-
write
(data)¶ 将一些数据字节写入传输:参见
WriteTransport.write()
。
-
writelines
(data)¶ 将数据字节的列表(或任何可迭代的)写入传输:参见
WriteTransport.writelines()
。
-
write_eof
()¶ 刷新缓冲数据后关闭传输的写入结束:请参阅
WriteTransport.write_eof()
。
-
18.5.5.4. StreamReaderProtocol¶
- class
asyncio.
StreamReaderProtocol
(stream_reader, client_connected_cb=None, loop=None)¶ Trivial助手类适用于
Protocol
和StreamReader
之间。Protocol
的子类。stream_reader is a
StreamReader
instance, client_connected_cb is an optional function called with (stream_reader, stream_writer) when a connection is made, loop is the event loop instance to use.(这是一个帮助类,而不是使
StreamReader
本身为Protocol
子类,因为StreamReader
有其他潜在用途,StreamReader
意外调用了协议的不当方法。)
18.5.5.5. IncompleteReadError¶
18.5.5.7. Stream examples¶
18.5.5.7.1. TCP echo client using streams¶
TCP回显客户端使用asyncio.open_connection()
函数:
import asyncio
@asyncio.coroutine
def tcp_echo_client(message, loop):
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
print('Send: %r' % message)
writer.write(message.encode())
data = yield from reader.read(100)
print('Received: %r' % data.decode())
print('Close the socket')
writer.close()
message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()
18.5.5.7.2. TCP echo server using streams¶
TCP回显服务器使用asyncio.start_server()
函数:
import asyncio
@asyncio.coroutine
def handle_echo(reader, writer):
data = yield from reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print("Received %r from %r" % (message, addr))
print("Send: %r" % message)
writer.write(data)
yield from writer.drain()
print("Close the client socket")
writer.close()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
18.5.5.7.3. 获取HTTP头¶
在命令行中获取URL的HTTP头的简单示例:
import asyncio
import urllib.parse
import sys
@asyncio.coroutine
def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
connect = asyncio.open_connection(url.hostname, 443, ssl=True)
else:
connect = asyncio.open_connection(url.hostname, 80)
reader, writer = yield from connect
query = ('HEAD {path} HTTP/1.0\r\n'
'Host: {hostname}\r\n'
'\r\n').format(path=url.path or '/', hostname=url.hostname)
writer.write(query.encode('latin-1'))
while True:
line = yield from reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print('HTTP header> %s' % line)
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_http_headers(url))
loop.run_until_complete(task)
loop.close()
用法:
python example.py http://example.com/path/page.html
或使用HTTPS:
python example.py https://example.com/path/page.html
18.5.5.7.4. Register an open socket to wait for data using streams¶
协程等待,直到套接字使用open_connection()
函数接收数据:
import asyncio
try:
from socket import socketpair
except ImportError:
from asyncio.windows_utils import socketpair
@asyncio.coroutine
def wait_for_data(loop):
# Create a pair of connected sockets
rsock, wsock = socketpair()
# Register the open socket to wait for data
reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = yield from reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_data(loop))
loop.close()
也可以看看
register an open socket to wait for data using a protocol示例使用由AbstractEventLoop.create_connection()
方法创建的低级协议注册开放套接字以等待数据。
watch a file descriptor for read events示例使用低级AbstractEventLoop.add_reader()
方法注册套接字的文件描述器。