Python中的进程间通信
- 2025-01-10 08:47:00
- admin 原创
- 101
问题描述:
在两个独立的 Python 运行时之间进行通信的最佳方法是什么?尝试过的方法:
在命名管道上读取/写入例如
os.mkfifo
(感觉很黑客)dbus
服务(适用于桌面,但对于无头系统来说太重了)套接字(似乎太低级了;肯定有一个更高级的模块可以使用?)
我的基本要求是能够python listen.py
像守护进程一样运行,能够从接收消息python client.py
。客户端只需向现有进程发送一条消息并终止,0
成功时返回代码,失败时返回非零(即需要双向通信)。
解决方案 1:
该multiprocessing
库提供了包装套接字的监听器和客户端,并允许您传递任意的 python 对象。
你的服务器可以监听接收python对象:
from multiprocessing.connection import Listener
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
listener = Listener(address, authkey=b'secret password')
conn = listener.accept()
print 'connection accepted from', listener.last_accepted
while True:
msg = conn.recv()
# do something with msg
if msg == 'close':
conn.close()
break
listener.close()
您的客户端可以将命令作为对象发送:
from multiprocessing.connection import Client
address = ('localhost', 6000)
conn = Client(address, authkey=b'secret password')
conn.send('close')
# can also send arbitrary objects:
# conn.send(['a', 2.5, None, int, sum])
conn.close()
解决方案 2:
不,zeromq才是最佳选择。很赞,不是吗?
import argparse
import zmq
parser = argparse.ArgumentParser(description='zeromq server/client')
parser.add_argument('--bar')
args = parser.parse_args()
if args.bar:
# client
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:5555')
socket.send_string(args.bar)
msg = socket.recv_string()
print msg
else:
# server
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:5555')
while True:
msg = socket.recv_string()
if msg == 'zeromq':
socket.send_string('ah ha!')
else:
socket.send_string('...nah')
解决方案 3:
根据@vsekhar的回答,这里有一个包含更多细节和多个连接的Python 3版本:
服务器
from multiprocessing.connection import Listener
listener = Listener(('localhost', 6000), authkey=b'secret password')
running = True
while running:
conn = listener.accept()
print('connection accepted from', listener.last_accepted)
while True:
msg = conn.recv()
print(msg)
if msg == 'close connection':
conn.close()
break
if msg == 'close server':
conn.close()
running = False
break
listener.close()
客户
from multiprocessing.connection import Client
import time
# Client 1
conn = Client(('localhost', 6000), authkey=b'secret password')
conn.send('foo')
time.sleep(1)
conn.send('close connection')
conn.close()
time.sleep(1)
# Client 2
conn = Client(('localhost', 6000), authkey=b'secret password')
conn.send('bar')
conn.send('close server')
conn.close()
解决方案 4:
根据我的经验,rpyc
这是迄今为止最简单、最优雅的方法。
解决方案 5:
我会使用套接字;本地通信经过了高度优化,因此您不应该遇到性能问题,并且它使您能够在需要时将应用程序分发到不同的物理节点。
关于“低级”方法,您说得对。但您可以根据需要随时使用更高级别的包装器。XMLRPC可能是一个不错的选择,但对于您要执行的任务来说,它可能有点过头了。
Twisted提供了一些很好的协议简单实现,比如LineReceiver(用于简单的基于行的消息)或更优雅的 AMP(顺便说一下,它是标准化的并以不同的语言实现)。
解决方案 6:
查看名为 RabbitMQ 的跨平台库/服务器。对于双进程通信来说,它可能太重了,但如果您需要多进程或多代码库通信(使用各种不同的方式,例如一对多、队列等),那么这是一个不错的选择。
要求:
$ pip install pika
$ pip install bson # for sending binary content
$ sudo apt-get rabbitmq-server # ubuntu, see rabbitmq installation instructions for other platforms
发布者(发送数据):
import pika, time, bson, os
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
i = 0
while True:
data = {'msg': 'Hello %s' % i, b'data': os.urandom(2), 'some': bytes(bytearray(b'x00x0Fx98x24'))}
channel.basic_publish(exchange='logs', routing_key='', body=bson.dumps(data))
print("Sent", data)
i = i + 1
time.sleep(1)
connection.close()
订阅者(接收数据,可以是多个):
import pika, bson
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
data = bson.loads(body)
print("Received", data)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
示例基于https://www.rabbitmq.com/tutorials/tutorial-two-python.html
解决方案 7:
我会使用套接字,但使用 Twisted 为您提供一些抽象,并使事情变得简单。他们的简单 Echo 客户端/服务器示例是一个很好的起点。
您只需合并文件并根据传递的参数实例化并运行客户端或服务器。
解决方案 8:
我发现这个线程是 Python IPC 的第一批结果之一,但我正在寻找可以与AsyncIO一起运行的东西。我最终发现IPyC提供了很好的异步功能,所以我回到这里分享这个精华。IPyC 还支持同步实现。
可以从两个不同的进程中使用 IPyC 库,但这里有一个小示例,在同一个文件中有两个 asyncio 任务。它使用 TCP:9999 作为默认端口。
注意:此示例在 Python >=3.10 上崩溃unexpected keyword argument 'loop'
;这是由于 asyncio 中的接口更改造成的。我已使用 v3.9 进行了测试。
import asyncio
import ipyc
import json
import logging
logging.basicConfig(level=logging.INFO) # Set to DEBUG to see inner workings
# IPyC has support for custom (de)serialization; using json as example here
ipyc.IPyCSerialization.add_custom_serialization(list, json.dumps)
ipyc.IPyCSerialization.add_custom_deserialization(list, json.loads)
## Host stuff
host = ipyc.AsyncIPyCHost()
@host.on_connect
async def on_client_connect(connection: ipyc.AsyncIPyCLink):
logging.info("Got a connection")
while connection.is_active():
message = await connection.receive()
if message:
logging.info(f"Received: {message}")
logging.info("Connection closed")
## Client stuff
async def client_task():
client = ipyc.AsyncIPyCClient()
link = await client.connect()
for i in range(3):
await link.send(["Hello World!", i, 3.14])
await asyncio.sleep(1)
await client.close() # Close the connection
await asyncio.sleep(1)
## AsyncIO stuff
loop = asyncio.get_event_loop()
loop.create_task(host.start())
loop.run_until_complete(client_task())