Streaming Speech Recognition with Python (Linux)
First we create our virtual environment and install the requirements:
$ python3 -m venv .venv
$ . .venv/bin/activate
$ pip install tiro-apis@git+https://gitlab.com/tiro-is/tiro-apis.git@master
This will install Python code generated from this repo, and its dependencies
(mainly gRPC). You can also run protoc
manually on the .proto
files.
We can now import the generated modules and start using them:
import grpc
from lr.speech.v2beta1.speech_pb2 import (StreamingRecognizeRequest,
StreamingRecognitionConfig,
RecognitionConfig,
RecognitionAudio)
from lr.speech.v2beta1.speech_pb2_grpc import SpeechStub
The class SpeechStub
is a gRPC stub we use to call RPCs on the service.
To communicate with the service we need to define our credentials. We compose SSL channel credentials with acces token call credentials:
creds = grpc.composite_channel_credentials(
grpc.ssl_channel_credentials(),
access_token_call_credentials(TIRO_ACCES_TOKEN)
)
If you don't already have an access token, you can request one by emailing tiro@tiro.is.
Now we can create a channel and connect SpeechStub
to the service:
server_url = "speech.talgreinir.is:443"
channel = grpc.secure_channel(
server_url, creds
)
stub = SpeechStub(channel)
Let's say we have a 16 bit WAV file sampled at 16 kHz. We can then stream chunks of audio while simultaneously receiving partial transcriptions. We have to define a request generator which generates chunks of audio to be recognized:
def requests() -> Iterator[speech_pb2.StreamingRecognizeRequest]:
with wave.open(args.audio_file[0], "rb") as wav_f:
sample_rate_hertz = wav_f.getframerate()
chunk_size_in_seconds = 0.5
yield speech_pb2.StreamingRecognizeRequest(
streaming_config=speech_pb2.StreamingRecognitionConfig(
interim_results=True,
config=speech_pb2.RecognitionConfig(
encoding=args.audio_file[1],
sample_rate_hertz=sample_rate_hertz,
language_code=args.language_code,
),
)
)
chunk_size_in_samples = int(
sample_rate_hertz * chunk_size_in_seconds
)
while True:
chunk = wav_f.readframes(chunk_size_in_samples)
if not chunk:
break
yield speech_pb2.StreamingRecognizeRequest(audio_content=chunk,)
time.sleep(args.chunk_delay)
And then we simply iterate over the generated responses and print the transcripts:
responses = stub.StreamingRecognize(requests())
final_transcripts = []
for response in responses:
for res in response.results:
if len(res.alternatives) > 0:
transcript = res.alternatives[0].transcript
print(
"{}{}".format("".join(final_transcripts), transcript),
)
if res.is_final:
final_transcripts.append(transcript)
print()
Since we set interim_results=True
we will recieve partial transcripts as well
as final transcripts. For one audio file this might print out:
undir þessa
undir þessa rekstrareiningu fellur
undir þessa rekstrareiningu fellur allur verslana
undir þessa rekstrareiningu fellur allur verslunarrekstur á Íslandi
undir þessa rekstrareiningu fellur allur verslunarrekstur á Íslandi með mat og
undir þessa rekstrareiningu fellur allur verslunarrekstur á Íslandi með mat og sérvöru
undir þessa rekstrareiningu fellur allur verslunarrekstur á Íslandi með mat og sérvöru auk reksturs í
undir þessa rekstrareiningu fellur allur verslunarrekstur á Íslandi með mat og sérvöru auk reksturs í Svíþjóð vegna Bónus
undir þessa rekstrareiningu fellur allur verslunarrekstur á Íslandi með mat og sérvöru auk reksturs í Svíþjóð vegna Bónuss og Hagkaups
undir þessa rekstrareiningu fellur allur verslunarrekstur á Íslandi með mat og sérvöru auk reksturs í Svíþjóð vegna Bónuss og Hagkaups
A full working example might look something like this:
# Copyright 2021 Tiro ehf. (Author: Róbert Kjaran)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
import wave
from typing import Tuple, Iterator
import grpc
from lr.speech.v2beta1 import speech_pb2, speech_pb2_grpc
def main() -> None:
import argparse
parser = argparse.ArgumentParser(
description="""Example usage of the Tiro Speech API
Excepts the environment variable TIRO_ACCESS_TOKEN to contain a valid
token
""",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--server-url",
"-c",
default="speech.talgreinir.is:443",
help="Server to connect to",
)
def supported_filetypes(fname: str,) -> Tuple[str, str]:
_, ext = os.path.splitext(fname)
if ext not in (".wav",):
raise ValueError("Unsupported extension!")
encoding = speech_pb2.RecognitionConfig.LINEAR16
return (fname, encoding)
parser.add_argument("audio_file", type=supported_filetypes)
parser.add_argument(
"--chunk-delay", type=float, default=0.2, help="Chunk delay in seconds",
)
parser.add_argument("--chunk-size-in-seconds", type=int, default=0.5)
parser.add_argument("--language-code", type=str, default="is-IS")
args = parser.parse_args()
creds = grpc.composite_channel_credentials(
grpc.ssl_channel_credentials(),
grpc.access_token_call_credentials(os.environ["TIRO_ACCESS_TOKEN"],),
)
with grpc.secure_channel(args.server_url, creds) as channel:
stub = speech_pb2_grpc.SpeechStub(channel)
def requests() -> Iterator[speech_pb2.StreamingRecognizeRequest]:
try:
with wave.open(args.audio_file[0], "rb") as wav_f:
sample_rate_hertz = wav_f.getframerate()
yield speech_pb2.StreamingRecognizeRequest(
streaming_config=speech_pb2.StreamingRecognitionConfig(
interim_results=True,
config=speech_pb2.RecognitionConfig(
encoding=args.audio_file[1],
sample_rate_hertz=sample_rate_hertz,
language_code=args.language_code,
),
)
)
chunk_size_in_samples = int(
sample_rate_hertz * args.chunk_size_in_seconds
)
while True:
chunk = wav_f.readframes(chunk_size_in_samples)
if not chunk:
break
yield speech_pb2.StreamingRecognizeRequest(audio_content=chunk,)
time.sleep(args.chunk_delay)
except Exception as e:
print(e, file=sys.stderr)
raise
responses = stub.StreamingRecognize(requests())
final_transcripts = []
for response in responses:
for res in response.results:
if len(res.alternatives) > 0:
transcript = res.alternatives[0].transcript
print(
"{}{}".format("".join(final_transcripts), transcript), end="\n",
)
if res.is_final:
final_transcripts.append(transcript)
print()
if __name__ == "__main__":
main()