Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add python samples for long audio voice conversion #1822

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions samples/python/console/long-audio-voice-conversion/audio_merger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# coding: utf-8

# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE.md file in the project root for full license information.

import logging
from pathlib import Path
from typing import List

from audio_splitter import Segment, SegmentLabel
from pydub import AudioSegment

LOG = logging.getLogger(__name__)


def merge_audio(
input_audio: Path, segment_list: List[Segment], vc_output_dir: Path, output_audio: Path, sample_rate: int = 24000
) -> None:
input_audio_data = AudioSegment.from_file(input_audio)
output_audio_data = AudioSegment.empty()
for seg_idx, start_time, end_time, label in segment_list:
if label is SegmentLabel.speech:
try:
audio_data = AudioSegment.from_file(vc_output_dir / f"{seg_idx:06}.wav")
except Exception:
LOG.error("VC failed for seg {}, use origional segment".format(seg_idx))
audio_data = input_audio_data[start_time:end_time]
audio_length = len(audio_data)
duration = end_time - start_time
if audio_length != duration:
LOG.warning(
"{:06}.wav, duration was changed after VC, {} - {} = {}ms (increase)".format(
seg_idx, audio_length, duration, audio_length - duration
)
)
if audio_length < duration:
audio_data += AudioSegment.silent(duration=duration - audio_length, frame_rate=sample_rate)
else:
audio_data = audio_data[:duration]
else:
audio_data = input_audio_data[start_time:end_time]
output_audio_data += audio_data
output_audio.absolute().parent.mkdir(exist_ok=True)
output_audio_data.export(output_audio, format="wav")
171 changes: 171 additions & 0 deletions samples/python/console/long-audio-voice-conversion/audio_splitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# coding: utf-8

# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE.md file in the project root for full license information.

import logging
import time
from enum import Enum
from pathlib import Path
from typing import List, NamedTuple, Tuple

import azure.cognitiveservices.speech as speechsdk
from pydub import AudioSegment

LOG = logging.getLogger(__name__)


class SegmentLabel(Enum):
"""
Defines all available segment label:
[failed, silence, speech]
"""

failed = 0 # failed to get the segment label
silence = 1 # voiced segment
speech = 2 # silent segment


class Segment(NamedTuple):
"""Represents an audio segment."""

idx: int # The position of this segment among all segments
start_time: int # start time of this segment, in millisecond
end_time: int # end time of this segment, in millisecond
label: SegmentLabel # segment label of this segment


class AudioSplitter:
def __init__(
self, subscription: str, region: str, language: str = "zh-CN", padding_in_seconds: float = 0.1
) -> None:
self.subscription = subscription
self.region = region
self.language = language
self.padding_in_seconds = padding_in_seconds

def split_audio(self, audio_path: Path, output_dir: Path) -> Tuple[List[Segment], Path]:
output_dir.mkdir(parents=True, exist_ok=True)
speech_segments = self._get_speech_segments(audio_path)
segment_list = self._get_all_segments(audio_path, speech_segments)
segment_output_dir = self.split_audio_by_segment_list(audio_path, segment_list, output_dir)
return segment_list, segment_output_dir

def _get_speech_segments(self, audio_path: Path) -> List[Tuple[int, int]]:
LOG.info(f"Start recognition for {audio_path}.")
audio_input = speechsdk.AudioConfig(filename=str(audio_path))
speech_config = speechsdk.SpeechConfig(subscription=self.subscription, region=self.region)
speech_config.speech_recognition_language = self.language
speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_input)

done = False
speech_segments = []

def recognized(evt: speechsdk.SpeechRecognitionCanceledEventArgs) -> None:
result = evt.result
if result.reason == speechsdk.ResultReason.RecognizedSpeech:
start = int(result.offset / self.audio_index_to_time_stamp) # millisecond
end = int((result.offset + result.duration) / self.audio_index_to_time_stamp) # millisecond
LOG.info(f"[{start/1000:.2f}s, {end/1000:.2f}s]({(end-start)/1000:.2f}s) {result.text}")
speech_segments.append((start, end))
elif result.reason == speechsdk.ResultReason.NoMatch:
LOG.error("No speech could be recognized: {}".format(result.no_match_details))
elif result.reason == speechsdk.ResultReason.Canceled:
cancellation_details = result.cancellation_details
LOG.error("Speech Recognition canceled: {}".format(cancellation_details.reason))
if cancellation_details.reason == speechsdk.CancellationReason.Error:
LOG.error("Error details: {}".format(cancellation_details.error_details))
LOG.error("Did you set the speech resource key and region values?")

def stop_cb(evt: speechsdk.SpeechRecognitionCanceledEventArgs) -> None:
LOG.debug(f"CLOSING on {evt}")
nonlocal speech_recognizer, done
speech_recognizer.stop_continuous_recognition()
done = True

speech_recognizer.session_started.connect(lambda evt: LOG.debug(f"SESSION STARTED: {evt}"))
speech_recognizer.recognized.connect(recognized)
speech_recognizer.session_stopped.connect(lambda evt: LOG.debug(f"SESSION STOPPED: {evt}"))
speech_recognizer.session_stopped.connect(stop_cb)
speech_recognizer.canceled.connect(lambda evt: LOG.debug(f"CANCELED: {evt}"))
speech_recognizer.canceled.connect(stop_cb)

speech_recognizer.start_continuous_recognition()
while not done:
time.sleep(0.1)
del speech_recognizer # stop access to audio_path
LOG.info("Finish recognition")

return speech_segments

def _get_all_segments(self, audio_path: Path, speech_segments: List[Tuple[int, int]]) -> List[Segment]:
audio_segment: AudioSegment = AudioSegment.from_file(audio_path)
audio_length = len(audio_segment) # in milliseconds
segment_list = []
idx = 1
if len(speech_segments) == 0:
LOG.warning("ASR result is empty, and we treat the segmentation of this file was failed.")
segment_list.append(Segment(idx, 0, audio_length, SegmentLabel.failed))
return segment_list

# get all voiced segments and silent segments
last_end_time = 0
padding = int(self.padding_in_seconds * self.second_to_audio_index)
for i, (start_time, end_time) in enumerate(speech_segments):
if start_time - last_end_time > padding:
start_time -= padding
segment_list.append(Segment(idx, last_end_time, start_time, SegmentLabel.silence))
idx += 1
else:
start_time = last_end_time

end_time = min(end_time, audio_length) # in milliseconds
next_start_time = audio_length if i == len(speech_segments) - 1 else speech_segments[i + 1][0]
if next_start_time - end_time >= padding * 2:
end_time += padding
else:
end_time += int((next_start_time - end_time) / 2)
last_end_time = end_time

segment_list.append(Segment(idx, start_time, end_time, SegmentLabel.speech))
idx += 1

if audio_length - last_end_time > 0:
# fill out missing segment
segment_list.append(Segment(idx, last_end_time, audio_length, SegmentLabel.silence))
return segment_list

def split_audio_by_segment_list(self, audio_path: Path, segment_list: List[Segment], output_dir: Path) -> Path:
LOG.info(f"Start segmentation for {audio_path}.")
audio_segment: AudioSegment = AudioSegment.from_file(audio_path)
segment_output_dir = output_dir / "segments"
segment_output_dir.mkdir(exist_ok=True)
last_end_time = 0
for segment in segment_list:
idx = f"{segment.idx:06}"
assert segment.start_time == last_end_time, "{} start_time({}) != last_end_time({})".format(
idx, segment.start_time, last_end_time
)
last_end_time = segment.end_time
split_segment_dir = segment_output_dir / segment.label.name
split_segment_dir.mkdir(exist_ok=True)
split_audio_output_path = split_segment_dir / f"{idx}.wav"
split_aduio_segment = audio_segment[segment.start_time : segment.end_time]
split_aduio_segment.export(split_audio_output_path, format="wav")
LOG.info("Finish segmentation")
return segment_output_dir

@property
def second_to_time_stamp(self) -> int:
"""Convert from seconds to ASR result timestamps(in 10^-7 s)"""
return 10**7

@property
def second_to_audio_index(self) -> int:
"""Convert from seconds to AudioSegment index (in millisecond)"""
return 10**3

@property
def audio_index_to_time_stamp(self) -> int:
"""Convert form AudioSegment index to ASR result timestamps"""
return int(self.second_to_time_stamp / self.second_to_audio_index)
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python
# coding: utf-8

# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE.md file in the project root for full license information.

import logging
from pathlib import Path

import librosa
import soundfile as sf
from audio_merger import merge_audio
from audio_splitter import AudioSplitter
from voice_changer import VoiceChanger

LOG = logging.getLogger(__name__)


def resample_wav(input_file: Path, output_file: Path, sample_rate: int = 16000) -> Path:
audio_data, _sr = librosa.load(input_file, sr=sample_rate, mono=True)
sf.write(file=output_file, data=audio_data, samplerate=sample_rate, subtype="PCM_16", format="WAV")
return output_file


class LongAudioVoiceChanger:
def __init__(
self,
subscription: str,
region: str,
container_url_with_sas: str,
language: str,
name: str,
) -> None:
self.audio_splitter = AudioSplitter(subscription, region, language)
self.voice_changer = VoiceChanger(subscription, region, container_url_with_sas, language, name)

def convert_one_audio(self, input_audio: Path, output_audio: Path, output_dir: Path) -> None:
# process input
audio_16k = resample_wav(input_audio, output_dir / "16k.wav", 16000)
audio_24k = resample_wav(input_audio, output_dir / "24k.wav", 24000)

# split long audio
segment_list, segment_output_dir = self.audio_splitter.split_audio(audio_16k, output_dir)

# call VC service for each voiced segment in parallel
speech_segments_dir = segment_output_dir / "speech"
if speech_segments_dir.is_dir():
vc_output_dir = output_dir / "speech_vc"
vc_output_dir.mkdir(parents=True, exist_ok=True)
input_audio_list = []
output_audio_list = []
for speech_segment_file in speech_segments_dir.iterdir():
input_audio_list.append(speech_segment_file)
output_audio_list.append(vc_output_dir / speech_segment_file.name)
self.voice_changer.convert_audios(input_audio_list, output_audio_list)

# merge back into long audio
merge_audio(audio_24k, segment_list, vc_output_dir, output_audio, sample_rate=24000)


# Demo for long audio voice conversion
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s\t%(name)s\t%(levelname)s\tP%(process)d\t%(message)s",
)

# Replace with your own subscription key, service region (e.g., "westus") and container URL(with SAS token).
subscription = "YourSubscriptionKey"
region = "YourServiceRegion"
container_url_with_sas = "YourContainerURLWithSAS"
language = "zh-CN"
name = "zh-CN-XiaoxiaoNeural"
long_audio_vc = LongAudioVoiceChanger(subscription, region, container_url_with_sas, language, name)

# Define input and output
input_audio = Path("src.wav") # input long audio file
output_audio = Path("output/vc.wav") # output long audio file
output_dir = Path("output/debug") # temporary output directory for debug
output_dir.mkdir(parents=True, exist_ok=True)

long_audio_vc.convert_one_audio(input_audio, output_audio, output_dir)
63 changes: 63 additions & 0 deletions samples/python/console/long-audio-voice-conversion/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Voice conversion(VC) for long audio using Speech SDK

This demo shows that how to convert long audio into another voice using Speech SDK.

## Prerequisites

Refer to [this readme](../README.md) for prerequisites of speech SDK installation.

### Python packages

This sample needs these python packages:

- `pydub`: split the input audio.
- `librosa`: load and resample the input audio.
- `soundfile`: save the output resampled audio.
- `tqdm`: show a smart progress meter when convert multi audios.
- `azure-storage-blob`: to access azure blob.

Install them using the following command:

``` sh
python3 -m pip install pydub librosa soundfile tqdm azure-storage-blob
```

### Azure Blob Storage

When calling the voice conversion(VC) service, the location of the source audio is specified by using `<mstts:voiceconversion url='XXX'/>` in SSML. The source `url` should be an https URL ([Azure Blob Storage](https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction) can host your audio files on an https URL).

This sample uses `Azure Blob Storage` to temporarily store the source audio, please refer to [Create a container](https://learn.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-portal) to create a new container.

**Note**: you need to configure anonymous public read access for the **container**, otherwise the speech service cannot read the source audio in the Azure container. See [Configure anonymous public read access for containers and blobs](https://learn.microsoft.com/en-us/azure/storage/blobs/anonymous-read-access-configure?view=form-recog-3.0.0&tabs=portal#set-the-public-access-level-for-a-container).

## Steps to convert long audio into another voice

The basic steps to convert long audio into another voice shown in this sample are

1. `Split audio`: first convert input long audio to 16 kHz, 16 bit, single channel WAV, then use ASR service to get the segment info and finally split the long audio into multiple continuous short audios(voiced segments and silent segments).
2. `Call VC`: call VC service for each voiced segment in parallel. For each audio, first upload to azure blob, then call VC service for conversion, and finally delete the audio in azure blob.
3. `Merge audio`: merge silent segments and converted voiced segments back into a single audio file. Before splicing audio, adjust the length of the converted voiced segments according to the length of the corresponding source voiced segment(truncate or splice silence behind)

## Run the sample

To run the app, navigate to the `samples/python/console/long-audio-voice-conversion` directory in your local copy of the samples repository.

Update the following strings before running the sample code(`long_audio_voice_changer.py`):

- `YourSubscriptionKey`: replace with your subscription key.
- `YourServiceRegion`: replace with the [region](https://aka.ms/csspeech/region) your subscription is associated with.
- `YourContainerURLWithSAS`: replace with your container URL with SAS token, refer to [Create SAS tokens for storage containers](https://learn.microsoft.com/en-us/azure/applied-ai-services/form-recognizer/create-sas-tokens). **Note**: the SAS token must have at least *read*, *write* and *delete* permissions.

You may want to update other configuration such as `language`(e.g, "zh-CN") and `name`(e.g, "zh-CN-XiaoxiaoNeural").

Start the app with the following command:

``` sh
python3 long_audio_voice_changer.py
```

Depending on your platform, the Python 3 executable might also just be called `python`.

## Note

The voice conversion service is available for *zh-CN-XiaoxiaoNeural* in all region, *zh-CN-YunxiNeural* on southeastasia region and *en-US-GuyNeural* on westus2 region currently.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
azure-cognitiveservices-speech
azure-storage-blob
tqdm
pydub
librosa
soundfile
Binary file not shown.
Loading