Lejdi Prifti

0 %
Lejdi Prifti
Software Engineer
Web3 Developer
ML Practitioner
  • Residence:
    Albania
  • City:
    Tirana
  • Email:
    info@lejdiprifti.com
English
Italian
French
Spring
React & Angular
Machine Learning
Docker & Kubernetes
AWS & Cloud
Team Player
Communication
Time Management
  • Java, JavaScript, Python
  • AWS, Kubernetes, Azure
  • Bootstrap, Materialize
  • Css, Sass, Less
  • Blockchain, Ethereum, Solidity
  • React, React Native, Flutter
  • GIT knowledge
  • Machine Learning, Deep Learning
0

No products in the basket.

Speech to text with Amazon Transcribe, SNS and S3

6. March 2024

In the previous article we looked at how we could transcribe audio to text by leveraging AWS services, such as Transcribe and Simple Storage Service (S3). Quick tutorial to AWS Transcribe with Python writes about the batch mode of audio transcription with AWS Transcribe and S3, while Real-time streaming with AWS Transcribe and Python writes about the real-time mode of audio transcription using only AWS Transcribe.

In this article, however, we are going to design a full AWS architecture for converting speech to text. We are going to use Amazon Transcribe, S3, Simple Notification Service (SNS), WebSocket, Redis and FastAPI, a Python framework.

Table of Contents

Solution design

The implementation of a solution always begins with the design. The image below is the architecture I have designed. 

The flow starts with the user that registers the audio through their mobile phone. At the same time, the mobile phone opens a WebSocket connection with the speech-to-text microservice. They send the audio file to the speech-to-text microservice, that is running on a EC2 instance in a public subnet of our VPC. The microservice keeps a pairing between the id of the WebSocket session and the upcoming Transcribe job on Redis after uploading the file to the S3 bucket. If the file has been successfully uploaded, the microservice calls the AWS Transcribe API to begin the transcription process. The time required by Transcribe to transcribe the audio varies and it is the size of the file that determines this.

When Transcribe job is finished, it uploads the transcription JSON into the output bucket on S3. S3 triggers a notification into an SNS topic, which calls through a POST call the speech-to-text microservice on EC2.

Finally, the microservice reads the id of the WebSocket session it had previously created, captures by id that specific WebSocket connection from the pool and sends the transcription to the appropriate user.

Let’s see how this is done in practice.

WebSocket connection

We begin by setting up a way to store WebSocket connections that will be created. For each WebSocket connection, we need to have a unique identifier. To each user / mobile phone will belong a WebSocket connection. 

That’s why we create a websocket_pool that stores a mapping between the session_id of the WebSocket and the WebSocket connection itself. The rest of the methods are static and help us to store the connection, get it by id, send a response in a specific connection and finally, remove the websocket connection from the pool.

				
					class NotificationService:
    websocket_pool: Dict[str, WebSocket] = {}

    @staticmethod
    def store_websocket_connection(session_id: str, websocket: WebSocket):
        NotificationService.websocket_pool[session_id] = websocket

    @staticmethod
    def get_websocket_connection(session_id: str) -> Optional[WebSocket]:
        return NotificationService.websocket_pool.get(session_id)

    @staticmethod
    async def send_response(session_id: str, response):
        websocket = NotificationService.get_websocket_connection(session_id)
        if websocket:
            logger.info(f"got websocket connection for session ID: {session_id}")
            await websocket.send_json(response)
        else:
            logger.error(f"no websocket connection found for session ID: {session_id}")

    @staticmethod
    def remove_websocket_connection(session_id: str):
        if session_id in NotificationService.websocket_pool:
            del NotificationService.websocket_pool[session_id]
            logger.info(f"removed websocket connection for session ID: {session_id}")
        else:
            logger.debug(f"no websocket connection found for session ID: {session_id}")
				
			

Afterwards, what is left to do, is use the NotificationService into our NotificationRouter. In this router, we define a WebSocket endpoint /notification and read from the WebSocket connection the header named sec-websocket-key, which is unique for each connection. It will serve as our session_id.

We save the connection and the session_id into the pool and send back the session_id to the other participant in the connection. For each transcription that the user will start, we expect to receive also the session_id.

Finally, we keep the connection opened by having a loop that constantly expects messages to arrive. However, messages go only in one direction that is from the microservice to the mobile application. 

				
					class NotificationRouter(Router):

    @Router.router.websocket("/notification")
    async def notify(websocket: WebSocket):
        try:
            await websocket.accept()
            session_id = websocket.headers.get("sec-websocket-key")
            NotificationService.store_websocket_connection(session_id, websocket)
            await websocket.send_json({"session_id": session_id})
            while True:
                await websocket.receive_text()
        except Exception as e:
            logger.debug(f"Error: {e}")
        finally:
            NotificationService.remove_websocket_connection(session_id=session_id)
				
			

File upload

The mobile application records the speech of the user and when they finish, uploads the audio file to S3 using our microservice. 

In the code below, for each audio file uploaded, it is sent the media_format, the output_file_name that represents the name of the file that will be uploaded in the S3 output bucket when the Transcribe job is finished, and finally the session_id that mentioned earlier.

As soon as the file is successfully uploaded into S3, the microservice triggers Transcribe to start the job. 

				
					@app.post("/upload-audio")
async def upload_audio(
    file: Annotated[UploadFile, File()],
    output_file_name: Annotated[str, Form()],
    session_id: Annotated[str, Form()],
    media_format: Annotated[str, Form()],
):
    if not file.content_type.startswith("audio/"):
        raise HTTPException(status_code=400, detail="Only audio files are allowed")

    with open(f"assets/{file.filename}", "wb") as audio_file:
        audio_file.write(await file.read())

    s3 = boto3.client("s3", region_name="eu-west-1")

    try:
        s3.upload_file(
            f"assets/{file.filename}",
            "stt-audio-inputs",
            f"audios/{file.filename}",
        )

        logger.info(
            "file uploaded successfully to {} bucket as {}".format(
                "stt-audios-input", file.filename
            )
        )
        return TranscribeRouter.transcribe_service.start_job(
            session_id=session_id,
            input_file_name=file.filename,
            media_format=media_format,
            output_file_name=output_file_name,
        )
    except Exception as e:
            logger.error(f"could not transcribe file ({file.filename}): {e}")
            raise Router.HTTPException(
                status_code=Router.status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=f"could not transcribe file ({file.filename}): {e}",
            )
				
			

Transcription initialization

The following code block shows how the start_job method looks. The only difference from the previous article in this piece of code is the storage of session_id and output_key into Redis. The output_key is the name of the output file that Transcribe stores in the output bucket. 

				
					def start_job(
        self,
        session_id: str,
        output_file_name: str,
        media_format: str,
        input_file_name: str,
    ):
        try:
            output_key = f"{output_file_name}.json"
            logger.debug(
                f"storing session id {session_id} for output {output_key} in cache"
            )
            RedisService.get_redis_client().set(output_key, session_id, 3600)
            return self.client.start_transcription_job(
                TranscriptionJobName=output_file_name,
                LanguageCode="en-US",
                MediaFormat=media_format,
                Media={
                    "MediaFileUri": f"s3://stt-audio-inputs/audios/{input_file_name}",
                },
                OutputBucketName="stt-json-outputs",
                OutputKey=output_key,
            )
        except Exception as e:
            logger.debug(f"Error: {e}")
            raise e
				
			

Event configuration

Now we need to configure step 8 in the solution design above. This step includes S3 that will send a notification to an SNS topic. We can achieve this functionality by using Event notifications feature of S3. For every action, such as s3:ObjectCreated:Put and s3:ObjectCreated:Post, an event will be sent to the specified topic.

SNS topic configuration

We go and create a topic into SNS. The configuration is simple. It will be a standard topic with our microservice as a subscriber. We insert the following public IP of the EC2 instance where the microservice is deployed as an HTTP endpoint subscriber. Now, you might be asking which endpoint you should insert. That’s a good question. Try /notify ! We will create it in the next section.

Notification endpoint

First of all, when you configure the endpoint, AWS will ask the endpoint to confirm the subscription by sending a GET request to the SubscribeURL that is inside the event message of type SubscriptionConfirmation. To automatically achieve the subscription, we create the confirm_subscription method. 

				
					
    async def confirm_subscription(self, event):
        print(f'preparing to confirm subscription {event["Type"]}')
        try:
            if event["Type"] == "SubscriptionConfirmation":
                print(f"event is a subscription event")
                response = await requests.get(event["SubscribeURL"])
                logger.debug(f"successfully confirmed subscription {response}")
        except Exception as e:
            logger.debug(f"Error: {e}")
				
			

After having confirmed the endpoint, SNS will notify the endpoint for every file that will be uploaded into the output bucket. Remember, in the output bucket only Transcribe has the access to write. All the other services are disallowed by the Bucket Policy.

				
					    @Router.router.post("/notify")
    async def notify_job_finished(event=Router.Body()):
        try:
            json_event = json.loads(event)
            await TranscribeRouter.transcribe_service.confirm_subscription(json_event)
            message = json.loads(json_event["Message"])
            key = message["Records"][0]["s3"]["object"]["key"]
            session_id = RedisService.get_redis_client().get(key)
            s3_response = TranscribeRouter.s3_service.read_transcripted_file(key)
            logger.info(f"preparing for sending response to {session_id}")
            await NotificationService.send_response(
                session_id=session_id.decode("utf-8"), response=s3_response
            )
        except Exception as e:
            logger.debug(f"Error: {e}")
            raise Router.HTTPException(
                status_code=Router.status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=f"Internal Server Error: {e}",
            )
				
			

After the SNS notifies that the job is finished and the output file is ready, the microservice reads the content of the file from S3. Next, it gets the session_id that “owns” this ouput file by reading into Redis. Using the session_id, it goes and grabs the WebSocket connection from the pool. Finally, it sends back the content of the file. 

Support

If you liked the content, consider sharing or following me on Twitter.

Thank you for your attention!

Buy Me A Coffee
Posted in DevOpsTags:
Write a comment