18.5.4. Transports and protocols (callback based API)¶
18.5.4.1. Transports¶
Transports是由asyncio
提供的类,以便抽象各种类型的通信通道。你一般不会自己实例化Transports;相反,您将调用AbstractEventLoop
方法,该方法将创建传输并尝试启动基础通信通道,当它成功时调用您。
一旦建立了通信信道,传输总是与protocol实例配对。然后protocol可以出于各种目的调用transport的方法。
asyncio
当前实现TCP,UDP,SSL和子进程管道的传输。transport上可用的方法取决于transport的种类。
Transports类不是线程安全的。
18.5.4.1.1. BaseTransport ¶基类Transport
- class
asyncio.
BaseTransport
¶ BaseTransport基类。
-
close
(self)¶ 关闭Transport。如果Transport对象具有用于传出数据的缓冲区,则缓冲的数据将被异步刷新。也不再接收数据。在所有缓冲数据被刷新之后,Transport对象的
connection_lost()
方法将None
作为其参数被调用。
-
is_closing
(self)¶ 如果Transport正在关闭或已经关闭,则返回
True
。版本3.5.1中的新功能。
-
get_extra_info
(name, default=None)¶ 返回可选的Transport对象信息。name是表示要获取的传输特定信息的字符串,如果信息不存在则返回default的值。
这种方法允许transport更容易地暴露通道特定的信息。
- 插座:
'peername'
:套接字连接的远程地址,socket.socket.getpeername()
的结果(None
'socket'
:socket.socket
实例'sockname'
:套接字自己的地址,socket.socket.getsockname()
- SSL套接字:
'compression'
:压缩算法用作字符串,或None
如果连接未压缩;ssl.SSLSocket.compression()
的结果'cipher'
:包含所使用的密码的名称,定义其使用的SSL协议的版本和正在使用的秘密位的数量的三值元组;ssl.SSLSocket.cipher()
的结果'peercert'
:对等证书;ssl.SSLSocket.getpeercert()
的结果'sslcontext'
:ssl.SSLContext
实例'ssl_object'
:ssl.SSLObject
或ssl.SSLSocket
实例
- 管:
'pipe'
:pipe对象
- 子过程:
'subprocess'
:subprocess.Popen
实例
- 插座:
在3.5.1版更改:
'ssl_object'
信息已添加到SSL套接字。-
18.5.4.1.2. ReadTransport¶
- class
asyncio.
ReadTransport
¶ 只读transport接口。
-
pause_reading
()¶ transport暂停接收。在调用
resume_reading()
之前,不会将任何数据传递到协议的data_received()
-
resume_reading
()¶ 恢复数据接收。如果一些数据可用于读取,则将再次调用协议的
data_received()
方法。
-
18.5.4.1.3. WriteTransport¶
- class
asyncio.
WriteTransport
¶ 只写transport接口。
-
abort
()¶ 立即关闭Transport对象,无需等待未知的操作完成。缓冲数据将丢失。数据也不会再被接收。Transport对象的
connection_lost()
方法最终将以None
作为参数进行调用。
-
can_write_eof
()¶ 如果Transport对象支持
write_eof()
方法,则返回True
,否则返回False
,
-
get_write_buffer_size
()¶ 返回Transport对象所使用的输出缓冲区的当前大小。
-
get_write_buffer_limits
()¶ 得到写缓冲的高位和低位的值。返回元组
(低, 高)
其中低和高是确定的的字节数。使用
set_write_buffer_limits()
函数设置写缓冲的高低位限制。版本3.4.2中的新功能。
-
set_write_buffer_limits
(high=None, low=None)¶ 设置写入流控制的高和低水限制。
这两个值控制调用协议的
pause_writing()
和resume_writing()
方法的时间。如果指定,低水限制必须小于或等于高水限制。高或低都可以为负。默认值是特定于实现的。如果仅给出了高水位限制,则低水位限制默认为小于或等于高水位限制的实施特定值。将高设置为零强制低为零,并导致每当缓冲区变为非空时调用
pause_writing()
。将低设置为零会导致resume_writing()
仅在缓冲区为空时调用。对任一限制使用零通常是次优的,因为它减少了同时进行I / O和计算的机会。使用
get_write_buffer_limits()
获取限制。
-
write
(data)¶ 将一些数据字节写入Transport对象。
本方法不阻塞;数据将被异步发送。
-
write_eof
()¶ 在刷新缓冲的数据后关闭transport。但仍可接收数据。
此方法可能引发
NotImplementedError
如果transport不支持的话(例如,SSL)不支持半关闭。
-
18.5.4.1.4. DatagramTransport¶ UDP数据报Transport
18.5.4.1.5. BaseSubprocessTransport ¶子进程Transport
- class
asyncio.
BaseSubprocessTransport
¶ -
get_pid
()¶ 返回子进程的进程ID。
-
get_pipe_transport
(fd)¶ 返回与整数文件对应的通信管道的传输描述器fd:
0
: readable streaming transport of the standard input (stdin), orNone
if the subprocess was not created withstdin=PIPE
1
: writable streaming transport of the standard output (stdout), orNone
if the subprocess was not created withstdout=PIPE
2
: writable streaming transport of the standard error (stderr), orNone
if the subprocess was not created withstderr=PIPE
- 其他fd:
None
-
get_returncode
()¶ 与
subprocess.Popen.returncode
属性类似,返回子进程returncode作为整数或None
(如果未返回)。
-
kill
(self)¶ 杀死子进程,类似
subprocess.Popen.kill()
。在POSIX系统上,函数将发送SIGKILL信号到子进程。在Windows上,此方法是
terminate()
的别名。
-
send_signal
(signal)¶ 将信号号发送到子过程,如
subprocess.Popen.send_signal()
中所示。
-
terminate
()¶ 请求子进程停止,如
subprocess.Popen.terminate()
中。此方法是close()
方法的别名。在POSIX系统上,此方法将SIGTERM发送到子过程。在Windows上,调用Windows API函数TerminateProcess()以停止子过程。
-
close
()¶ Ask the subprocess to stop by calling the
terminate()
method if the subprocess hasn’t returned yet, and close transports of all pipes (stdin, stdout and stderr).
-
18.5.4.2. Protocols¶
asyncio
提供了可以子类化来实现网络协议的基类。这些类与transports(见下文)结合使用:协议解析输入数据,并要求输出数据的写入,而传输负责实际的I / O和缓冲。
当子类化协议类时,建议您覆盖某些方法。这些方法是可被回调的:它们将由传输器在某些事件产生时(例如当接收到一些数据时)被调用;你不应该自己调用他们,除非你正在重新实现一个Transport类。
注意
所有回调都有默认实现,它们是空的。因此,您只需要实现您关注的事件的回调方法。
18.5.4.2.1. Protocol classes¶
- class
asyncio.
Protocol
¶ 用于实现流协议的基类(用于TCP和SSL Transport)。
- class
asyncio.
DatagramProtocol
¶ 用于实现数据报协议的基类(用于UDP Transport).
- class
asyncio.
SubprocessProtocol
¶ 用于实现与子进程(通过一组单向管道)通信的协议的基类。
18.5.4.2.2. Connection callbacks¶
以下回调可以在协议
,DatagramProtocol
和SubprocessProtocol
实例上调用:
-
BaseProtocol.
connection_made
(transport)¶ 连接完成时被调用。
transport参数是表示当前产生连接的Transport对象。您可以将您需要的一些内容存储起来(例如存到一些对象的属性中),如果你需要话。
connection_made()
和connection_lost()
每次成功连接只调用一次。所有其他回调将在这两种方法之间被调用,这允许在协议实现中更容易的资源管理。
以下回调只能在SubprocessProtocol
实例上调用:
-
SubprocessProtocol.
pipe_data_received
(fd, data)¶ 当子进程将数据写入其stdout或stderr管道时调用。fd是管道的整数文件描述器。data是包含数据的非空字节对象。
-
SubprocessProtocol.
pipe_connection_lost
(fd, exc)¶ 当与子进程通信的其中一个管道关闭时调用。fd是关闭的整数文件描述器。
-
SubprocessProtocol.
process_exited
()¶ 当子进程退出时调用。
18.5.4.2.3. Streaming protocols¶
在Protocol
实例上调用以下回调:
-
Protocol.
data_received
(data)¶ 当接收到一些数据时调用。data是包含传入数据的非空字节对象。
注意
数据是否被缓冲,分块或重组取决于传输。一般来说,你不应该依赖于特定的语义,而是让你的解析通用和足够灵活。但是,始终以正确的顺序接收数据。
-
Protocol.
eof_received
()¶ 当另一端发出信号时,它将不再发送任何数据(例如通过调用
write_eof()
,如果另一端也使用asyncio)。此方法可能返回false值(包括None),在这种情况下,传输将关闭自身。相反,如果此方法返回一个真值,关闭传输由协议决定。由于默认实现返回None,它隐式关闭连接。
注意
某些传输(如SSL)不支持半关闭连接,在这种情况下,从此方法返回true将不会阻止关闭连接。
data_received()
可在连接期间调用任意次数。但是,eof_received()
最多被调用一次,如果调用,data_received()
将不会被调用。
状态机:
18.5.4.2.4. 数据报协议¶
在DatagramProtocol
实例上调用以下回调。
-
DatagramProtocol.
datagram_received
(data, addr)¶ 在接收到数据报时调用。data是包含传入数据的字节对象。addr是发送数据的对方的地址;确切的格式取决于Transport对象。
18.5.4.2.5. Flow control callbacks¶
这些回调可以在Protocol
,DatagramProtocol
和SubprocessProtocol
实例上调用:
-
BaseProtocol.
pause_writing
()¶ 当传输缓冲区超过高水位标记时调用。
-
BaseProtocol.
resume_writing
()¶ 当Transport缓冲低于低位标记时调用。
pause_writing()
and resume_writing()
calls are paired – pause_writing()
is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing()
is called once when the buffer size reaches the low-water mark.
注意
如果缓冲区大小等于高水位线,pause_writing()
不会被调用 - 它必须严格超过。相反,当缓冲区大小等于或低于低水位标记时,调用resume_writing()
。这些结束条件对于确保当任一标记为零时都如预期的那样重要。
注意
在BSD系统(OS X,FreeBSD等)DatagramProtocol
不支持流控制,因为写入太多数据包导致的发送失败不能被轻易检测到。套接字总是出现“就绪”,多余的包被丢弃;可能会或可能不会引发具有errno设置为errno.ENOBUFS
的OSError
如果它被提升,它将被报告到DatagramProtocol.error_received()
但是否则被忽略。
18.5.4.2.6. Coroutines and protocols¶
协程可以使用ensure_future()
在协议方法中调度,但不能保证执行顺序。协议不知道在协议方法中创建协程,因此不会等待它们。
要具有可靠的执行顺序,请在协议中使用stream objects,产生 从
。例如,StreamWriter.drain()
协程可以用于等待直到写入缓冲区被刷新。
18.5.4.3. Protocol示例¶
18.5.4.3.1. TCP回显客户端协议¶
TCP echo客户端使用AbstractEventLoop.create_connection()
方法,发送数据并等待连接关闭:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
'127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
事件循环运行两次。在这个简短的例子中,优先使用run_until_complete()
方法来引发异常,如果服务器没有监听,而不必写一个短的协程来处理异常并停止运行循环。在run_until_complete()
退出时,循环不再运行,因此在发生错误时不需要停止循环。
18.5.4.3.2. TCP回显服务器协议¶
TCP回显服务器使用AbstractEventLoop.create_server()
方法,发送回接收到的数据并关闭连接:
import asyncio
class EchoServerClientProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
Transport.close()
可以在WriteTransport.write()
之后立即调用,即使数据尚未在套接字上发送:两种方法都是异步的。不需要yield 从
,因为这些传输方法不是协程。
18.5.4.3.3. UDP echo客户端协议¶
UDP echo客户端使用AbstractEventLoop.create_datagram_endpoint()
方法,当我们收到答案时发送数据并关闭传输:
import asyncio
class EchoClientProtocol:
def __init__(self, message, loop):
self.message = message
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Socket closed, stop the event loop")
loop = asyncio.get_event_loop()
loop.stop()
loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, loop),
remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()
18.5.4.3.4. UDP echo服务器协议¶
UDP echo服务器使用AbstractEventLoop.create_datagram_endpoint()
方法,发送回接收到的数据:
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
transport.close()
loop.close()
18.5.4.3.5. Register an open socket to wait for data using a protocol¶
等待套接字使用协议使用AbstractEventLoop.create_connection()
方法接收数据,然后关闭事件循环
import asyncio
try:
from socket import socketpair
except ImportError:
from asyncio.windows_utils import socketpair
# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()
class MyProtocol(asyncio.Protocol):
transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport (it will call connection_lost())
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed, stop the event loop
loop.stop()
# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Run the event loop
loop.run_forever()
# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()
None
watch a file descriptor for read events示例使用低级AbstractEventLoop.add_reader()
方法注册套接字的文件描述器。
register an open socket to wait for data using streams示例使用协程中的open_connection()
函数创建的高级流。