Lejdi Prifti

0 %
Lejdi Prifti
Software Engineer
DevOps Engineer
ML Practitioner
  • Residence:
    Albania
  • City:
    Tirana
  • Email:
    info@lejdiprifti.com
Spring
AWS & Cloud
Angular
Team Player
Coordination & Leadership
Time Management
Docker & Kubernetes
ReactJs
JavaScript
Python
  • Java, JavaScript, Python
  • AWS, Kubernetes, Azure
  • Bootstrap, Materialize
  • Css, Sass, Less
  • Blockchain, Ethereum, Solidity
  • React, React Native, Flutter
  • GIT knowledge
  • Machine Learning, Deep Learning
0

No products in the basket.

Real-time streaming with AWS Transcribe and Python

8. February 2024

In this article, we will add a new feature and continue developing the program we started in the last article. You guessed it right. It is real-time streaming with AWS Transcribe and Python.

If you didn’t go through the first part of this tutorial, make sure you do. It will help you get a better understanding of what AWS Transcribe does and how we can use it. By the time you finish this second tutorial, you will know how to use AWS Transcribe and FastAPI for real-time streaming.

Table of Contents

Transcribing streaming audio

By using Amazon Transcribe streaming, we may create transcriptions for our media content in real time. Whereas media files must be uploaded for batch transcriptions, streaming media is sent in real time to Amazon Transcribe. The transcript is then provided by Amazon Transcribe, again in real time.

How does streaming work?

Transcripts are generated in partial results because streaming operates in real time. The incoming audio stream is divided up by Amazon Transcribe according to genuine speech chunks, like a speaker switch or an audio pause. Amazon Transcribe starts returning transcription results as soon as we start streaming the audio. 

Once a segment is fully transcribed, the transcription is returned to our application as a stream of transcription events, with each response holding more recorded speech.

For instance, the real-time transcription of a two-second audio recording with the words “This is a test” captured would look like this. Each line is the partial result of the audio segment.

				
					This is a
This is a test
This is a test.
				
			

Until a speech segment’s final transcription result is produced, Amazon Transcribe keeps producing partial results. With each new partial result output, streaming transcriptions may vary somewhat because voice recognition may rewrite words as it learns more context.

Tutorial

In this tutorial, we are going to accomplish the following functional requirements.

  1. Upload an audio file through an endpoint in FastAPI.
  2. Transcribe the audio file using AWS Transcribe real-time streaming.
  3. Output the transcription partial results in real time using WebSocket.

Do you need assistance from an AWS expert?

You may easily schedule a call with me on TopMate.
CONTACT
Requirements

First of all, we must install a couple of libraries that we will need along the way.

  • websockets supports the creation of WebSocket connections.
  • python-multipart is necessary since we will be dealing with multipart data when uploading audio files.
  • amazon-transcribe is the official SDK provided by AWS Labs that provides the classes we need to create real time streaming with AWS Transcribe.
  • aiofile is used for asynchronous file I/O operations in Python. It provides an asynchronous interface for reading from and writing to files, which is particularly useful in asynchronous applications where blocking file operations could lead to decreased performance or concurrency issues.
				
					amazon-transcribe==0.6.2
aiofile==3.8.8
websockets==12.0
python-multipart==0.0.7
				
			
Endpoint Configuration

To begin with, we will create a POST endpoint that will accept an audio format as input. In our main.py file, practically where we have created an instance of the FastAPI class, we will create our endpoint. 

The updated file will look as follows. The endpoint /audio accepts an audio file and saves it in the assets directory of our project. If you have not created it yet, this is the moment to do so. At the end, this endpoint returns the name of the file we uploaded.

				
					from fastapi import FastAPI, File, HTTPException, UploadFile

from src.routers import Router
from src.routers.online_router import OnlineRouter
from src.routers.transcribe_router import TranscribeRouter

app = FastAPI()

@app.router.post("/audio", response_model=None)
async def add_audio_file(file: UploadFile = File(...)):
    if not file.content_type.startswith('audio/'):
            raise HTTPException(status_code=400, detail="Only audio files are allowed")

    with open(f"assets/{file.filename}", "wb") as audio_file:
        audio_file.write(await file.read())

    return file.filename

router = Router()

online_router = OnlineRouter().router
transcribe_router = TranscribeRouter().router
routers_list = [online_router, transcribe_router]
router.attach_router(app, routers_list)

				
			
WebSocket Configuration

In the transcribe_router.py file that we created in the previous article, we will add a new endpoint that accepts WebSocket traffic.

It accepts the WebSocket connection and then, enters a loop in which it waits to be sent a filename. After receiving the filename, the transcription process starts. This process is done by method start_stream of the TranscribeService class. If you enter a whitespace, the loop exits and the connection is closed.

				
					    @Router.router.websocket("/stream")
    async def start_streaming(websocket: WebSocket):
        await websocket.accept()
        try:
            while True:
                filename = await websocket.receive_text()
                if filename.isspace():
                    break
                await TranscribeRouter.transcribe_service.start_stream(
                    filename, websocket
                )
        except Exception as e:
            print(f"Error: {e}")
        
        await websocket.close()
				
			
Transcribe Process

There are two key components to the transcription process. One is the custom event handler class named TranscribeEventHandler that extends TranscriptResultStreamHandler from the amazon_transcribe.handlers module. It’s designed to handle transcript events received from the Amazon Transcribe service and sends the transcripts over a WebSocket connection. More details about it are coming in a moment.

The other component is the asynchronous method start_stream that initiates streaming transcription from an audio file to the Amazon Transcribe service and sends the transcription results over a WebSocket connection. 

Let’s begin by looking at the TranscribeEventHandler.

				
					from amazon_transcribe.handlers import TranscriptResultStreamHandler
from amazon_transcribe.model import TranscriptEvent, TranscriptResultStream
from fastapi import WebSocket


class TranscribeEventHandler(TranscriptResultStreamHandler):
    def __init__(
        self, transcript_result_stream: TranscriptResultStream, websocket: WebSocket
    ):
        super().__init__(transcript_result_stream)
        self.websocket = websocket

    async def handle_transcript_event(self, transcript_event: TranscriptEvent):
        results = transcript_event.transcript.results
        for result in results:
            for alt in result.alternatives:
                await self.websocket.send_text(alt.transcript)
				
			
TranscribeEventHandler

The constructor initializes the TranscribeEventHandler object, which takes two parameters:

  • transcript_result_stream: An instance of TranscriptResultStream representing the stream of transcript results from Amazon Transcribe.
  • websocket: An instance of WebSocket representing the WebSocket connection to which the transcripts will be sent.

Furthmore, handle_transcript_event overrides the same method from the superclass. It receives a transcript_event parameter, which is an instance of TranscriptEvent containing transcript results, extracts the transcript results from the event and iterates over them. Finally, for each result, it iterates over the alternatives and sends each alternative’s transcript over the WebSocket connection.

Start Stream method

Let’s look now at the start_stream method, which resides in the transcribe_service.py file we saw in the previous article

				
					async def start_stream(self, filename: str, websocket: WebSocket):
        stream = await self.streaming_client.start_stream_transcription(
            language_code="en-US",
            media_sample_rate_hz=16000,
            media_encoding="pcm",
        )

        async def write_chunks():
            async with aiofile.AIOFile(f"assets/{filename}", "rb") as afp:
                reader = aiofile.Reader(afp, chunk_size=1024 * 16)
                async for chunk in reader:
                    await stream.input_stream.send_audio_event(audio_chunk=chunk)
            await stream.input_stream.end_stream()

        handler = TranscribeEventHandler(stream.output_stream, websocket)
        await asyncio.gather(write_chunks(), handler.handle_events())
				
			

The method It takes two parameters:

  • filename: A string representing the name of the audio file to be transcribed.
  • websocket: As already mentioned, an instance of WebSocket to which the transcription results will be sent.

Next, we initiate a streaming transcription session with the Amazon Transcribe service using the start_stream_transcription method of streaming_client. It allows us to specifiy parameters such as language code, media sample rate, and media encoding.

Starting at line 8, an inner function called write_chunks is used to transfer audio data to the transcription stream by reading it in segments. It opens the audio file using aiofile.AIOFile for asynchronous file I/O, iteratively reads chunks of audio data from the file and finally, sends them to the transcription stream using stream.input_stream.end_stream().

Then, on line 15 we create an instance of TranscribeEventHandler, passing the output stream of the transcription session and the WebSocket connection as arguments.

Finally, on line 16 we use asyncio.gather to concurrently execute the write_chunks function and handler.handle_events method.

  • write_chunks() asynchronously reads audio data from the file and sends it to the transcription service.
  • handler.handle_events() asynchronously listens for transcription events from the service and sends them over the WebSocket connection.

Testing

We have everything we need. Now it is time to test. Let me remind you of the command we use to run the application.

				
					docker-compose up --build
				
			

I used Swagger to upload from the endpoint we created a file named test.wav that says 4 simple words.

This is a test.

Next, I am using the following command from the terminal of an Ubuntu machine to open a WebSocket connection with the endpoint /stream.

 

				
					wscat -c  ws://localhost:3000/stream
				
			

Afterwards, I type the name of the file I want to transcribe. It is test.wav. You guessed it right. This is the result. 

				
					Connected (press CTRL+C to quit)
> test.wav
< This is a
< This is a test.
< This is a test.
> test.wav
< This is a
< This is a test.
< This is a test.
>  
Disconnected (code: 1000, reason: "")
				
			

Thanks

Happy coding!

Posted in DevOps, PythonTags:
4 Comments
Write a comment