欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > 火山引擎 大模型语音合成双向流式API-python demo

火山引擎 大模型语音合成双向流式API-python demo

2025/2/6 11:41:54 来源:https://blog.csdn.net/wuyongfan6589/article/details/143282488  浏览:    关键词:火山引擎 大模型语音合成双向流式API-python demo

在播放大模型生成内容时,可以发现时变生成变播放。这里可以使用火山引擎的双向流语音合成,官方没有提供python版本的demo,且官方文档实际上表述的并不清晰,所以我在阅读go语言版本后,自己写了一个,提供给大家。

官方文档 https://www.volcengine.com/docs/6561/1329505

代码

需要自行替换 APP_KEY和ACCESS_KEY

protocol.py

import json
import structclass Event:NONE = 0START_CONNECTION = 1FINISH_CONNECTION = 2CONNECTION_STARTED = 50CONNECTION_FAILED = 51CONNECTION_FINISHED = 52START_SESSION = 100FINISH_SESSION = 102SESSION_STARTED = 150SESSION_FINISHED = 152SESSION_FAILED = 153TASK_REQUEST = 200TTS_SENTENCE_START = 350TTS_SENTENCE_END = 351TTS_RESPONSE = 352def create_start_connection_frame():frame = bytearray()frame.append(0b0001_0001)  # headerframe.append(0b0001_0100)  # event_numberframe.append(0b0001_0000)  # serializationframe.append(0b0000_0000)  # reservedframe.extend(struct.pack(">i", Event.START_CONNECTION))  # event_typepayload = json.dumps({}).encode()payload_len = struct.pack(">I", len(payload))return bytes(frame + payload_len + payload)def create_finish_connection_frame() -> bytes:frame = bytearray()frame.append(0b0001_0001)  # headerframe.append(0b0001_0100)  # event_numberframe.append(0b0001_0000)  # serializationframe.append(0b0000_0000)  # reservedframe.extend(struct.pack(">i", Event.FINISH_CONNECTION))  # event_type# session_id_lenframe.extend(struct.pack(">I", len(b'{}')))  # payload_lenframe.extend(b'{}')  # payloadreturn bytes(frame)def create_start_session_frame(session_id: str, speaker: str):b_meta_data_json = json.dumps({"event": 100,"req_params": {"speaker": speaker,"audio_params": {"format": "mp3", "sample_rate": 24000},},}, ensure_ascii=False).encode()frame = bytearray()frame.append(0b0001_0001)  # headerframe.append(0b0001_0100)  # event_numberframe.append(0b0001_0000)  # serializationframe.append(0b0000_0000)  # reservedframe.extend(struct.pack(">i", Event.START_SESSION))  # event_type# session_id_lenframe.extend(struct.pack(">I", len(session_id.encode())))frame.extend(session_id.encode())  # session_id# meta_data_lenframe.extend(struct.pack(">I", len(b_meta_data_json)))frame.extend(b_meta_data_json)return bytes(frame)def create_finish_session_frame(session_id: str):frame = bytearray()frame.append(0b0001_0001)  # headerframe.append(0b0001_0100)  # event_numberframe.append(0b0001_0000)  # serializationframe.append(0b0000_0000)  # reservedframe.extend(struct.pack(">i", Event.FINISH_SESSION))  # event_type# session_id_lenframe.extend(struct.pack(">I", len(session_id.encode())))frame.extend(session_id.encode())  # session_idframe.extend(struct.pack(">I", len(b'{}')))  # payload_lenframe.extend(b'{}')  # payloadreturn bytes(frame)def create_task_request_frame(chunk: str, session_id: str):b_chunk_json = json.dumps({"event": Event.TASK_REQUEST,"req_params": {"text": chunk,},}).encode()frame = bytearray()frame.append(0b0001_0001)  # headerframe.append(0b0001_0100)  # event_numberframe.append(0b0001_0000)  # serializationframe.append(0b0000_0000)  # reservedframe.extend(struct.pack(">i", Event.TASK_REQUEST))  # event_typesession_id_bytes = session_id.encode()session_id_len = struct.pack(">I", len(session_id_bytes))frame.extend(session_id_len)frame.extend(session_id_bytes)frame.extend(struct.pack(">I", len(b_chunk_json)))frame.extend(b_chunk_json)return bytes(frame)def parse_frame(frame):if not isinstance(frame, bytes):raise ValueError(f"frame is not bytes: {frame}")header = frame[:4]version = header[0] >> 4header_size = (header[0] & 0x0F) * 4message_type = header[1] >> 4flags = header[1] & 0x0Fserialization_method = header[2] >> 4compression_method = header[2] & 0x0Fevent = struct.unpack(">I", frame[4:8])[0]payload_start = header_sizeif flags & 0x04:  # Check if event number is presentpayload_start += 4if message_type in [0b0001, 0b1001, 0b1011]:  # Full request/response or Audio-onlysession_id_len = struct.unpack(">I", frame[payload_start: payload_start + 4])[0]session_id = frame[payload_start +4: payload_start + 4 + session_id_len].decode()payload_start += 4 + session_id_lenelse:session_id = Nonepayload_len = struct.unpack(">I", frame[payload_start: payload_start + 4])[0]payload = frame[payload_start + 4: payload_start + 4 + payload_len]return {"version": version,"message_type": message_type,"serialization_method": serialization_method,"compression_method": compression_method,"event": event,"session_id": session_id,"payload": payload,}

client.py

import asyncio
import logging
import uuid
from typing import AsyncGeneratorimport websocketsclass TtsClient:DEFAULT_API_ENDPOINT = "wss://openspeech.bytedance.com/api/v3/tts/bidirection"def get_headers(self):return {"X-Api-App-Key": "your VOL_TTS_APP_KEY","X-Api-Access-Key": "your VOL_TTS_ACCESS_KEY,"X-Api-Resource-Id": "volc.service_type.10029","X-Api-Request-Id": uuid.uuid4(),}async def send_task_frame(self,session_id: str,text_generator: AsyncGenerator[str, None],ws: websockets.WebSocketClientProtocol,):async for chunk in text_generator:task_frame = protocol.send_task_frame(chunk=chunk, session_id=session_id)await ws.send(task_frame)await ws.send(protocol.finish_session_frame(session_id))async def receive_response(self, ws: websockets.WebSocketClientProtocol):while True:response = await ws.recv()frame = protocol.parse_frame(response)match frame["event"]:case protocol.Event.TTS_RESPONSE:yield frame["payload"]case protocol.Event.SESSION_FINISHED | protocol.Event.FINISH_CONNECTION:breakasync def a_duplex_tts(self, message_id: str, text_generator: AsyncGenerator[str, None]) -> AsyncGenerator[bytes, None]:async with websockets.connect(self.DEFAULT_API_ENDPOINT, extra_headers=self.get_headers()) as ws:try:await ws.send(protocol.start_connection_frame())response = await ws.recv()logging.debug(protocol.parse_frame(response))start_session_frame = protocol.start_session_frame(session_id=message_id, speaker=settings.VOL_TTS_SPEAKER)await ws.send(start_session_frame)response = await ws.recv()logging.debug(protocol.parse_frame(response))send_task = asyncio.create_task(self.send_task_frame(message_id, text_generator, ws))async for audio_chunk in self.receive_response(ws):yield audio_chunk# wait for send task to finishawait send_taskawait ws.send(protocol.finish_session_frame(message_id))except Exception as e:logging.error(e, exc_info=True)finally:await ws.send(protocol.finish_connection_frame())

test

from typing import AsyncGenerator
import pytest
from ai_bot.tts.client import TtsClient
from langchain_openai import ChatOpenAIllm = ChatOpenAI(model="gpt-4o-mini",temperature=0.1,
)@pytest.mark.asyncio
async def test_run():client = TtsClient()async def a_text_generator() -> AsyncGenerator[str, None]:async for chunk in llm.astream("你好"):yield str(chunk.content)combined_audio = bytearray()async for chunk in client.a_duplex_tts(message_id="test_session_id", text_generator=a_text_generator()):combined_audio.extend(chunk)with open("combined_audio.wav", "wb") as audio_file:audio_file.write(combined_audio)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com