+
9
-

回答

我们将使用 ECAPA-TDNN (Emphasized Channel Attention, Propagation, and Aggregation in Time Delay Neural Network),这是一种非常强大的深度神经网络,专门用于从语音中提取高度可区分的 说话人嵌入 (Speaker Embedding)

这个系统的核心思想是:

特征提取: 将原始音频波形转换为 FBank (Filter Banks)Mel Spectrogram 特征。这两种特征都比 MFCC 保留了更多的原始信号信息,更适合作为深度学习模型的输入。我们将使用 torchaudio 来完成这一步。

嵌入提取: 使用一个预训练好的 ECAPA-TDNN 模型将可变长度的声学特征转换成一个固定长度的向量(例如,192维)。这个向量就是“声纹嵌入”,它在数学上代表了说话人的声音特性。来自同一个人的不同语音,其嵌入向量在向量空间中的距离会非常近。

录入与识别:

录入 (Enrollment): 录制用户的一段或多段语音,提取其嵌入向量。为了更稳定,通常会取多段语音嵌入的平均值作为该用户的最终“声纹模板”。

识别 (Recognition): 录制一段待测语音,提取其嵌入向量,然后计算它与数据库中所有已录入声纹模板的余弦相似度 (Cosine Similarity)。相似度最高的那个就是识别结果。

我们将使用 speechbrain 库,它极大地简化了加载和使用在海量数据(如 VoxCeleb)上预训练好的 SOTA (State-of-the-Art) 模型的过程。

第 1 步:安装必要的库

这个方案需要更多的库,特别是 PyTorch 和 SpeechBrain。请在终端中运行以下命令:

# 安装 PyTorch (请访问 PyTorch 官网获取最适合你系统的命令,特别是如果你有 GPU)
# CPU-only version:
pip install torch torchaudio

# 安装其他依赖
pip install speechbrain sounddevice numpy scipy huggingface_hub

第 2 步:编写代码

将以下代码保存为 deep_speaker_recognition.py。代码包含了完整的录入、识别流程,并有详细注释。

import os
import torch
import torchaudio
import sounddevice as sd
import numpy as np
import torch.nn.functional as F
import time

# --- 配置参数 ---
SAMPLE_RATE = 16000       # 模型期望的采样率 (Hz)
RECORD_SECONDS = 5        # 每次录音的时长 (秒)
EMBEDDINGS_FILE = "speaker_embeddings.pth" # 存放声纹嵌入的文件
ENROLL_COUNT = 3          # 为了获得更稳定的声纹,建议每人录入3次
SIMILARITY_THRESHOLD = 0.50 # 余弦相似度阈值,低于此值则认为是未知用户

class DeepSpeakerRecognizer:
    """
    使用预训练的 ECAPA-TDNN 模型进行声纹录入和识别。
    """
    def __init__(self, embeddings_file=EMBEDDINGS_FILE):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"正在使用设备: {self.device}")

        # 1. 加载预训练的 ECAPA-TDNN 模型
        print("正在加载预训练的 ECAPA-TDNN 模型...")
        try:
            from speechbrain.pretrained import EncoderClassifier
            self.model = EncoderClassifier.from_hparams(
                source="speechbrain/spkrec-ecapa-voxceleb",
                savedir="pretrained_models/spkrec-ecapa-voxceleb",
                run_opts={"device": self.device}
            )
            print("模型加载成功!")
        except Exception as e:
            print(f"模型加载失败: {e}")
            print("请确保已安装 speechbrain 并且网络连接正常。")
            exit()

        # 2. 加载已保存的声纹嵌入
        self.embeddings_file = embeddings_file
        self.speaker_embeddings = self._load_embeddings()
        print("-" * 30)

    def _load_embeddings(self):
        """从文件加载嵌入向量"""
        if os.path.exists(self.embeddings_file):
            print(f"从 {self.embeddings_file} 加载已存在的声纹嵌入...")
            return torch.load(self.embeddings_file, map_location=self.device)
        return {}

    def _save_embeddings(self):
        """将嵌入向量保存到文件"""
        torch.save(self.speaker_embeddings, self.embeddings_file)
        print(f"声纹嵌入已保存至 {self.embeddings_file}")

    def _record_audio(self, duration, sample_rate, prompt=""):
        """录制指定时长的音频"""
        if prompt:
            print(prompt)
        print("准备录音...")
        time.sleep(1)
        print(f"请在倒计时结束后开始说话,录音将持续 {duration} 秒。")
        for i in range(3, 0, -1):
            print(f"{i}...")
            time.sleep(1)

        print("开始录音!")
        recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=1, dtype='float32')
        sd.wait()
        print("录音结束!")

        return torch.from_numpy(recording.flatten())

    def _preprocess_audio(self, waveform):
        """对音频进行预处理,使其符合模型输入要求"""
        # 模型需要单声道 (mono) 音频
        if waveform.ndim > 1:
            waveform = torch.mean(waveform, dim=0)

        # 确保采样率为 16000 Hz
        # (此处录音时已指定,如果加载文件则需要重采样)

        return waveform.to(self.device)

    def _get_embedding(self, waveform):
        """使用模型提取声纹嵌入向量"""
        with torch.no_grad():
            # SpeechBrain 模型需要 batch 维度
            waveform = waveform.unsqueeze(0)
            embedding = self.model.encode_batch(waveform)
            # [1, 1, D] -> [D]
            embedding = embedding.squeeze() 
            # L2 归一化
            embedding = F.normalize(embedding, p=2, dim=0)
            return embedding

    def enroll(self, speaker_name):
        """录入一个新的说话人声纹"""
        if not speaker_name:
            print("错误:说话人姓名不能为空。")
            return

        print(f"\n--- 开始为 '{speaker_name}' 录入声纹 ---")
        print(f"为了保证准确性,需要录制 {ENROLL_COUNT} 次。")

        embeddings = []
        for i in range(ENROLL_COUNT):
            prompt = f"第 {i+1}/{ENROLL_COUNT} 次录音。请说一些话,内容可以不同。"
            audio_waveform = self._record_audio(RECORD_SECONDS, SAMPLE_RATE, prompt=prompt)
            processed_audio = self._preprocess_audio(audio_waveform)

            print(f"正在提取第 {i+1} 段录音的嵌入...")
            embedding = self._get_embedding(processed_audio)
            embeddings.append(embedding)

        # 计算平均嵌入作为最终的声纹模板
        final_embedding = torch.mean(torch.stack(embeddings), dim=0)
        final_embedding = F.normalize(final_embedding, p=2, dim=0) # 再次归一化

        self.speaker_embeddings[speaker_name] = final_embedding
        self._save_embeddings()

        print(f"\n成功!'{speaker_name}' 的声纹已录入。")

    def recognize(self):
        """识别说话人"""
        if not self.speaker_embeddings:
            print("\n错误:系统中没有任何已录入的声纹模型,请先录入。")
            return

        print("\n--- 开始声纹识别 ---")

        # 1. 录制待识别的音频
        audio_waveform = self._record_audio(RECORD_SECONDS, SAMPLE_RATE)
        processed_audio = self._preprocess_audio(audio_waveform)

        # 2. 提取待测嵌入
        print("正在提取待识别语音的嵌入...")
        test_embedding = self._get_embedding(processed_audio)

        # 3. 计算与所有已录入声纹的余弦相似度
        similarities = {}
        for name, enrolled_embedding in self.speaker_embeddings.items():
            # F.cosine_similarity 需要 batch 维度
            similarity = F.cosine_similarity(test_embedding.unsqueeze(0), enrolled_embedding.unsqueeze(0)).item()
            similarities[name] = similarity

        # 4. 找出最匹配的结果
        if not similarities:
            print("计算相似度失败。")
            return

        best_match_name = max(similarities, key=similarities.get)
        best_similarity = similarities[best_match_name]

        print("\n--- 识别结果 ---")
        for name, sim in sorted(similarities.items(), key=lambda item: item[1], reverse=True):
            print(f" - 与 '{name}' 的相似度: {sim:.4f}")

        if best_similarity >= SIMILARITY_THRESHOLD:
            print(f"\n最匹配的人是: ** {best_match_name} ** (相似度: {best_similarity:.4f})")
        else:
            print(f"\n未找到匹配的用户。最高相似度 ({best_similarity:.4f}) 低于阈值 ({SIMILARITY_THRESHOLD})。")
            print("识别为: ** 未知用户 **")

    def list_speakers(self):
        """列出所有已录入的用户"""
        print("\n--- 已录入的用户列表 ---")
        if not self.speaker_embeddings:
            print("系统中没有已录入的用户。")
        else:
            for i, name in enumerate(self.speaker_embeddings.keys(), 1):
                print(f"{i}. {name}")
        print("-" * 30)


def main():
    """主函数,提供命令行交互界面"""
    recognizer = DeepSpeakerRecognizer()

    while True:
        print("\n请选择一个操作:")
        print("1. 录入新的声纹 (Enroll)")
        print("2. 识别说话人 (Recognize)")
        print("3. 查看已录入列表 (List)")
        print("4. 退出 (Exit)")

        choice = input("请输入选项 (1/2/3/4): ").strip()

        if choice == '1':
            name = input("请输入您的姓名(仅限英文和数字): ").strip()
            recognizer.enroll(name)
        elif choice == '2':
            recognizer.recognize()
        elif choice == '3':
            recognizer.list_speakers()
        elif choice == '4':
            print("感谢使用,再见!")
            break
        else:
            print("无效的输入,请输入 1, 2, 3 或 4。")

if __name__ == "__main__":
    main()

第 3 步:如何运行和使用

保存代码: 将代码保存为 deep_speaker_recognition.py。

首次运行: 第一次运行时,speechbrain 会自动从 Hugging Face Hub 下载预训练的 ECAPA-TDNN 模型,这可能需要一些时间,请耐心等待。模型文件会保存在 pretrained_models 目录下。

运行程序: 在终端中执行 python deep_speaker_recognition.py。

使用流程:

录入声纹 (Enroll):

在主菜单输入 1。

输入用户名,例如 alice。

程序会提示你进行 3 次录音。每次录音前都有 3 秒倒计时,请在倒计时后清晰、自然地说话 5 秒钟。

完成后,程序会将 alice 的平均声纹嵌入保存在 speaker_embeddings.pth 文件中。

识别声纹 (Recognize):

在主菜单输入 2。

程序会要求你录制一段 5 秒的音频。

录制完成后,它会计算这段音频的嵌入与数据库中所有已录入声纹的余弦相似度

程序会打印出与所有用户的相似度分数,并根据最高的相似度及预设的阈值来判断说话人是谁,或者判断为“未知用户”。

网友回复

我知道答案,我要回答