18.5.4. Transports and protocols (callback based API)

18.5.4.1. Transports

Transports是由asyncio提供的类,以便抽象各种类型的通信通道。你一般不会自己实例化Transports;相反,您将调用AbstractEventLoop方法,该方法将创建传输并尝试启动基础通信通道,当它成功时调用您。

一旦建立了通信信道,传输总是与protocol实例配对。然后protocol可以出于各种目的调用transport的方法。

asyncio当前实现TCP,UDP,SSL和子进程管道的传输。transport上可用的方法取决于transport的种类。

Transports类不是线程安全的

18.5.4.1.1. BaseTransport 基类Transport

class asyncio.BaseTransport

BaseTransport基类。

close(self)

关闭Transport。如果Transport对象具有用于传出数据的缓冲区,则缓冲的数据将被异步刷新。也不再接收数据。在所有缓冲数据被刷新之后,Transport对象的connection_lost()方法将None作为其参数被调用。

is_closing(self)

如果Transport正在关闭或已经关闭,则返回True

版本3.5.1中的新功能。

get_extra_info(name, default=None)

返回可选的Transport对象信息。name是表示要获取的传输特定信息的字符串,如果信息不存在则返回default的值。

这种方法允许transport更容易地暴露通道特定的信息。

在3.5.1版更改: 'ssl_object'信息已添加到SSL套接字。

18.5.4.1.2. ReadTransport

class asyncio.ReadTransport

只读transport接口。

pause_reading()

transport暂停接收。在调用resume_reading()之前,不会将任何数据传递到协议的data_received()

resume_reading()

恢复数据接收。如果一些数据可用于读取,则将再次调用协议的data_received()方法。

18.5.4.1.3. WriteTransport

class asyncio.WriteTransport

只写transport接口。

abort()

立即关闭Transport对象,无需等待未知的操作完成。缓冲数据将丢失。数据也不会再被接收。Transport对象的connection_lost()方法最终将以None作为参数进行调用。

can_write_eof()

如果Transport对象支持write_eof()方法,则返回True,否则返回False

get_write_buffer_size()

返回Transport对象所使用的输出缓冲区的当前大小。

get_write_buffer_limits()

得到写缓冲的位和位的值。返回元组(低, 高)其中是确定的的字节数。

使用set_write_buffer_limits()函数设置写缓冲的高低位限制。

版本3.4.2中的新功能。

set_write_buffer_limits(high=None, low=None)

设置写入流控制的水限制。

这两个值控制调用协议的pause_writing()resume_writing()方法的时间。如果指定,低水限制必须小于或等于高水限制。都可以为负。

默认值是特定于实现的。如果仅给出了高水位限制,则低水位限制默认为小于或等于高水位限制的实施特定值。设置为零强制为零,并导致每当缓冲区变为非空时调用pause_writing()设置为零会导致resume_writing()仅在缓冲区为空时调用。对任一限制使用零通常是次优的,因为它减少了同时进行I / O和计算的机会。

使用get_write_buffer_limits()获取限制。

write(data)

将一些数据字节写入Transport对象。

本方法不阻塞;数据将被异步发送。

writelines(list_of_data)

将列表(或任何写迭代对象)写入到Transport对象中。这在功能上等同于对由迭代器产生的每个元素调用write(),但是可以更有效地实现。

write_eof()

在刷新缓冲的数据后关闭transport。但仍可接收数据。

此方法可能引发NotImplementedError如果transport不支持的话(例如,SSL)不支持半关闭。

18.5.4.1.4. DatagramTransport UDP数据报Transport

DatagramTransport.sendto(data, addr=None)

数据字节发送到由addr(传输相关的目标地址)给出的远程对等体。如果addrNone,则将数据发送到在传输创建时给定的目标地址。

这个方法不阻塞;它缓冲数据并安排它异步发送。

DatagramTransport.abort()

立即关闭运输,无需等待待完成的操作。缓冲数据将丢失。不会收到更多的数据。协议的connection_lost()方法最终将以None作为参数进行调用。

18.5.4.1.5. BaseSubprocessTransport 子进程Transport

class asyncio.BaseSubprocessTransport
get_pid()

返回子进程的进程ID。

get_pipe_transport(fd)

返回与整数文件对应的通信管道的传输描述器fd

  • 0: readable streaming transport of the standard input (stdin), or None if the subprocess was not created with stdin=PIPE
  • 1: writable streaming transport of the standard output (stdout), or None if the subprocess was not created with stdout=PIPE
  • 2: writable streaming transport of the standard error (stderr), or None if the subprocess was not created with stderr=PIPE
  • 其他fdNone
get_returncode()

subprocess.Popen.returncode属性类似,返回子进程returncode作为整数或None(如果未返回)。

kill(self)

杀死子进程,类似subprocess.Popen.kill()

在POSIX系统上,函数将发送SIGKILL信号到子进程。在Windows上,此方法是terminate()的别名。

send_signal(signal)

信号号发送到子过程,如subprocess.Popen.send_signal()中所示。

terminate()

请求子进程停止,如subprocess.Popen.terminate()中。此方法是close()方法的别名。

在POSIX系统上,此方法将SIGTERM发送到子过程。在Windows上,调用Windows API函数TerminateProcess()以停止子过程。

close()

Ask the subprocess to stop by calling the terminate() method if the subprocess hasn’t returned yet, and close transports of all pipes (stdin, stdout and stderr).

18.5.4.2. Protocols

asyncio提供了可以子类化来实现网络协议的基类。这些类与transports(见下文)结合使用:协议解析输入数据,并要求输出数据的写入,而传输负责实际的I / O和缓冲。

当子类化协议类时,建议您覆盖某些方法。这些方法是可被回调的:它们将由传输器在某些事件产生时(例如当接收到一些数据时)被调用;你不应该自己调用他们,除非你正在重新实现一个Transport类。

注意

所有回调都有默认实现,它们是空的。因此,您只需要实现您关注的事件的回调方法。

18.5.4.2.1. Protocol classes

class asyncio.Protocol

用于实现流协议的基类(用于TCP和SSL Transport)。

class asyncio.DatagramProtocol

用于实现数据报协议的基类(用于UDP Transport).

class asyncio.SubprocessProtocol

用于实现与子进程(通过一组单向管道)通信的协议的基类。

18.5.4.2.2. Connection callbacks

以下回调可以在协议DatagramProtocolSubprocessProtocol实例上调用:

BaseProtocol.connection_made(transport)

连接完成时被调用。

transport参数是表示当前产生连接的Transport对象。您可以将您需要的一些内容存储起来(例如存到一些对象的属性中),如果你需要话。

BaseProtocol.connection_lost(exc)

在连接丢失或关闭时调用。

参数是异常对象或None后者意味着接收到正常的EOF,或者连接被连接的这一侧中止或关闭。

connection_made()connection_lost()每次成功连接只调用一次。所有其他回调将在这两种方法之间被调用,这允许在协议实现中更容易的资源管理。

以下回调只能在SubprocessProtocol实例上调用:

SubprocessProtocol.pipe_data_received(fd, data)

当子进程将数据写入其stdout或stderr管道时调用。fd是管道的整数文件描述器。data是包含数据的非空字节对象。

SubprocessProtocol.pipe_connection_lost(fd, exc)

当与子进程通信的其中一个管道关闭时调用。fd是关闭的整数文件描述器。

SubprocessProtocol.process_exited()

当子进程退出时调用。

18.5.4.2.3. Streaming protocols

Protocol实例上调用以下回调:

Protocol.data_received(data)

当接收到一些数据时调用。data是包含传入数据的非空字节对象。

注意

数据是否被缓冲,分块或重组取决于传输。一般来说,你不应该依赖于特定的语义,而是让你的解析通用和足够灵活。但是,始终以正确的顺序接收数据。

Protocol.eof_received()

当另一端发出信号时,它将不再发送任何数据(例如通过调用write_eof(),如果另一端也使用asyncio)。

此方法可能返回false值(包括None),在这种情况下,传输将关闭自身。相反,如果此方法返回一个真值,关闭传输由协议决定。由于默认实现返回None,它隐式关闭连接。

注意

某些传输(如SSL)不支持半关闭连接,在这种情况下,从此方法返回true将不会阻止关闭连接。

data_received()可在连接期间调用任意次数。但是,eof_received()最多被调用一次,如果调用,data_received()将不会被调用。

状态机:

18.5.4.2.4. 数据报协议

DatagramProtocol实例上调用以下回调。

DatagramProtocol.datagram_received(data, addr)

在接收到数据报时调用。data是包含传入数据的字节对象。addr是发送数据的对方的地址;确切的格式取决于Transport对象。

DatagramProtocol.error_received(exc)

在先前的发送或接收操作引发OSError时调用。excOSError实例。

该方法在罕见的条件下调用,当运输(例如,UDP)检测到数据报不能传递到其收件人。在许多情况下,不可传递的数据报将被静默丢弃。

18.5.4.2.5. Flow control callbacks

这些回调可以在ProtocolDatagramProtocolSubprocessProtocol实例上调用:

BaseProtocol.pause_writing()

当传输缓冲区超过高水位标记时调用。

BaseProtocol.resume_writing()

当Transport缓冲低于低位标记时调用。

pause_writing() and resume_writing() calls are paired – pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.

注意

如果缓冲区大小等于高水位线,pause_writing()不会被调用 - 它必须严格超过。相反,当缓冲区大小等于或低于低水位标记时,调用resume_writing()这些结束条件对于确保当任一标记为零时都如预期的那样重要。

注意

在BSD系统(OS X,FreeBSD等)DatagramProtocol不支持流控制,因为写入太多数据包导致的发送失败不能被轻易检测到。套接字总是出现“就绪”,多余的包被丢弃;可能会或可能不会引发具有errno设置为errno.ENOBUFSOSError如果它被提升,它将被报告到DatagramProtocol.error_received()但是否则被忽略。

18.5.4.2.6. Coroutines and protocols

协程可以使用ensure_future()在协议方法中调度,但不能保证执行顺序。协议不知道在协议方法中创建协程,因此不会等待它们。

要具有可靠的执行顺序,请在协议中使用stream objects产生 例如,StreamWriter.drain()协程可以用于等待直到写入缓冲区被刷新。

18.5.4.3. Protocol示例

18.5.4.3.1. TCP回显客户端协议

TCP echo客户端使用AbstractEventLoop.create_connection()方法,发送数据并等待连接关闭:

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

事件循环运行两次。在这个简短的例子中,优先使用run_until_complete()方法来引发异常,如果服务器没有监听,而不必写一个短的协程来处理异常并停止运行循环。run_until_complete()退出时,循环不再运行,因此在发生错误时不需要停止循环。

也可以看看

TCP echo client using streams示例使用asyncio.open_connection()函数。

18.5.4.3.2. TCP回显服务器协议

TCP回显服务器使用AbstractEventLoop.create_server()方法,发送回接收到的数据并关闭连接:

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Transport.close()可以在WriteTransport.write()之后立即调用,即使数据尚未在套接字上发送:两种方法都是异步的。不需要yield ,因为这些传输方法不是协程。

也可以看看

TCP echo server using streams示例使用asyncio.start_server()函数。

18.5.4.3.3. UDP echo客户端协议

UDP echo客户端使用AbstractEventLoop.create_datagram_endpoint()方法,当我们收到答案时发送数据并关闭传输:

import asyncio

class EchoClientProtocol:
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        loop = asyncio.get_event_loop()
        loop.stop()

loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
    lambda: EchoClientProtocol(message, loop),
    remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

18.5.4.3.4. UDP echo服务器协议

UDP echo服务器使用AbstractEventLoop.create_datagram_endpoint()方法,发送回接收到的数据:

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
    EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

transport.close()
loop.close()

18.5.4.3.5. Register an open socket to wait for data using a protocol

等待套接字使用协议使用AbstractEventLoop.create_connection()方法接收数据,然后关闭事件循环

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

class MyProtocol(asyncio.Protocol):
    transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport (it will call connection_lost())
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed, stop the event loop
        loop.stop()

# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

None

watch a file descriptor for read events示例使用低级AbstractEventLoop.add_reader()方法注册套接字的文件描述器。

register an open socket to wait for data using streams示例使用协程中的open_connection()函数创建的高级流。