Skip to content

文字转语音 TTS edge 大声朗读

听说微软的文字转语音是声音最像人的人工智能引擎, 随后在 github 上找到一个开源的软件 https://github.com/rany2/edge-tts

使用 gpt 提取核心逻辑生成golang

go
package main

import (
	"crypto/sha256"
	"encoding/hex"
	"fmt"
	"log"
	"net/url"
	"os"
	"strings"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

// 常量定义
const (
	TrustedClientToken = "6A5AA1D4EAFF4E9FB37E23D68491D6F4"
	BaseURL            = "speech.platform.bing.com"
	WSSPath            = "/consumer/speech/synthesize/readaloud/edge/v1"
	SecMsGecVersion    = "1-140.0.3485.14" // 版本号示例,需同步更新
	DefaultVoice       = "zh-CN-XiaoxiaoNeural"
)

// DRM 相关参数
const (
	WinEpochSeconds = 11644473600
	SToNs           = 1e9
)

// 生成Sec-MS-GEC token,参考drm.py generate_sec_ms_gec
func generateSecMsGec(clockSkewSeconds float64) string {
	// 当前Unix时间加时钟偏移
	now := float64(time.Now().UTC().Unix()) + clockSkewSeconds

	// 转换为Windows文件时间格式(1601年起的100纳秒间隔)
	ticks := now + WinEpochSeconds

	// 向下取整到5分钟 (300秒)
	ticks = ticks - float64(int64(ticks)%300)

	// 转换为100纳秒单位
	ticks = ticks * (SToNs / 100)

	// 拼接字符串
	strToHash := fmt.Sprintf("%.0f%s", ticks, TrustedClientToken)

	// SHA256哈希
	h := sha256.Sum256([]byte(strToHash))

	return strings.ToUpper(hex.EncodeToString(h[:]))
}

// 构造SSML文本
func mkSSML(text, voice, pitch, rate, volume string) string {
	return fmt.Sprintf(
		`<speak version='1.0' xmlns='http://www.w3.org/2001/10/synthesis' xml:lang='en-US'>
            <voice name='%s'>
                <prosody pitch='%s' rate='%s' volume='%s'>%s</prosody>
            </voice>
        </speak>`,
		voice, pitch, rate, volume, text,
	)
}

// 发送消息格式化函数
func ssmlHeadersPlusData(requestId, timestamp, ssml string) string {
	return fmt.Sprintf(
		"X-RequestId:%s\r\nContent-Type:application/ssml+xml\r\nX-Timestamp:%sZ\r\nPath:ssml\r\n\r\n%s",
		requestId, timestamp, ssml,
	)
}

// 生成无破折号的UUID
func connectID() string {
	u := uuid.New()
	return strings.ReplaceAll(u.String(), "-", "")
}

func main() {
	// 时钟偏移,初始为0
	clockSkewSeconds := 0.0

	// 生成Sec-MS-GEC token
	secMsGec := generateSecMsGec(clockSkewSeconds)
	fmt.Println("Sec-MS-GEC Token:", secMsGec)

	// 构造WebSocket连接URL
	u := url.URL{
		Scheme: "wss",
		Host:   BaseURL,
		Path:   WSSPath,
	}
	query := url.Values{}
	query.Set("TrustedClientToken", TrustedClientToken)
	query.Set("Sec-MS-GEC", secMsGec)
	query.Set("Sec-MS-GEC-Version", SecMsGecVersion)
	query.Set("ConnectionId", connectID())
	u.RawQuery = query.Encode()

	fmt.Println("Connecting to:", u.String())

	// 建立WebSocket连接
	c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
	if err != nil {
		log.Fatal("dial:", err)
	}
	defer c.Close()

	// 发送speech.config消息
	speechConfig := `{"context":{"synthesis":{"audio":{"metadataoptions":{"sentenceBoundaryEnabled":"false","wordBoundaryEnabled":"true"},"outputFormat":"audio-24khz-48kbitrate-mono-mp3"}}}}`
	speechConfigMsg := fmt.Sprintf(
		"X-Timestamp:%s\r\nContent-Type:application/json; charset=utf-8\r\nPath:speech.config\r\n\r\n%s\r\n",
		time.Now().UTC().Format(time.RFC1123), speechConfig,
	)
	err = c.WriteMessage(websocket.TextMessage, []byte(speechConfigMsg))
	if err != nil {
		log.Fatal("write speech.config:", err)
	}

	// 构造SSML文本
	text := "当前免费计划限制最大上下文tokens"
	ssml := mkSSML(text, DefaultVoice, "+0Hz", "+0%", "+0%")

	// 发送SSML消息
	ssmlMsg := ssmlHeadersPlusData(connectID(), time.Now().UTC().Format("Mon Jan 2 2006 15:04:05 GMT-0700 (MST)"), ssml)
	err = c.WriteMessage(websocket.TextMessage, []byte(ssmlMsg))
	if err != nil {
		log.Fatal("write ssml:", err)
	}

	// 打开文件保存音频
	audioFile, err := os.Create("output.mp3")
	if err != nil {
		log.Fatal("create file:", err)
	}
	defer audioFile.Close()

	audioReceived := false

	// 读取消息循环
	for {
		messageType, message, err := c.ReadMessage()
		if err != nil {
			log.Println("read:", err)
			break
		}

		switch messageType {
		case websocket.TextMessage:
			// 解析文本消息(元数据等),这里简单打印
			fmt.Println("Text message:", string(message))
			// 查找子字符串在主字符串中首次出现的位置
			index := strings.Index(string(message), "turn.end")
			if index != -1 {
				c.Close()
				fmt.Printf("发现 END break 结束 Close")
				break
			} else {
				fmt.Printf("继续")
			}

		case websocket.BinaryMessage:
			// 二进制消息是音频数据,写入文件
			if len(message) < 2 {
				log.Println("binary message too short")
				continue
			}

			// 头部长度为前2字节big endian
			headerLength := int(message[0])<<8 | int(message[1])
			if headerLength > len(message) {
				log.Println("invalid header length")
				continue
			}

			// 解析头部,略(可根据python代码实现)

			// 音频数据在 headerLength + 2 后
			audioData := message[headerLength+2:]

			if len(audioData) == 0 {
				log.Println("empty audio data")
				continue
			}

			// 写入文件
			_, err := audioFile.Write(audioData)
			if err != nil {
				log.Println("write audio file error:", err)
				break
			}
			audioReceived = true

		default:
			log.Println("unknown message type:", messageType)
		}
	}

	if !audioReceived {
		log.Fatal("No audio received from the service")
	}

	fmt.Println("Audio saved to output.mp3")
}

Released under the MIT License.