欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > 完全用python 实现消息中间件2

完全用python 实现消息中间件2

2024/10/24 19:25:08 来源:https://blog.csdn.net/weixin_32759777/article/details/140904493  浏览:    关键词:完全用python 实现消息中间件2

为了完善这个简单的消息中间件,我们可以添加以下功能:

  1. 消息持久化:虽然在这个示例中我们不会使用数据库,但我们可以将消息保存到文件中,以模拟持久化存储。
  2. 消息确认:添加一个机制来确认消息已经被消费。
  3. 并发控制:确保在多线程或多进程环境中消息的安全处理。
    以下是更新后的代码:
from fastapi import FastAPI, HTTPException
from typing import Dict, List
import json
import os
from threading import Lock
app = FastAPI()
# 存储消息的字典,键为频道名,值为消息列表
channels: Dict[str, List[Dict[str, str]]] = {}
# 用于文件存储的路径
storage_path = "messages_storage"
# 确保存储路径存在
os.makedirs(storage_path, exist_ok=True)
# 消息锁,用于并发控制
lock = Lock()
# 消息结构示例
message_example = {"id": "message_id","content": "Hello, World!"
}
def save_message_to_file(channel: str, message: Dict[str, str]):# 将消息保存到文件中with open(os.path.join(storage_path, f"{channel}.json"), "a") as file:file.write(json.dumps(message) + "\n")
@app.post("/publish/{channel}")
async def publish_message(channel: str, message: Dict[str, str]):with lock:# 发布消息到指定频道if channel not in channels:channels[channel] = []channels[channel].append(message)save_message_to_file(channel, message)return {"message": "Message published successfully"}
@app.get("/consume/{channel}")
async def consume_message(channel: str):with lock:# 从指定频道消费消息if channel not in channels or not channels[channel]:raise HTTPException(status_code=404, detail="No messages available")# 返回并移除最新的一条消息message = channels[channel].pop(0)return message
@app.get("/messages/{channel}")
async def get_messages(channel: str):with lock:# 获取指定频道的所有消息if channel not in channels:raise HTTPException(status_code=404, detail="Channel not found")return {"channel": channel, "messages": channels[channel]}
if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)

在这个版本中,我们添加了以下功能:

  • 消息持久化:发布消息时,将消息追加到对应频道的文件中。
  • 并发控制:使用Lock来确保在多线程环境中对消息列表的访问是线程安全的。
    请注意,这个示例仍然是一个非常简化的消息中间件,它没有实现完整的事务性、错误恢复、消息确认等高级功能。在实际的生产环境中,您可能需要使用数据库和更复杂的数据管理策略来实现这些功能。此外,对于高并发场景,您可能需要考虑使用异步文件操作或更高效的存储解决方案。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com