Categorize incoming data with Elixir GenStage
Pankaj Agarwal
June 27, 2020

We are working on a communication system, now the requirements was to set up some automation with upcoming messages. So let’s say we have a message with some greeting keyword like hi/hello we need to respond to the user as a welcome message. This is a very simple example but now there are more complex things like if the user detects his language and responds to him in the same language, also if the user wants to ask a question send him a help template to navigate him to the help workflow.   

To set up these custom automation workflows we thought that every message needs to be handled based on a specific intent of that message. Which means every incoming message should be labeled with a specific tag. Like if the message starts with hello or hi or namaste then it should be labeled as a greeting tag, if this is the first message from a  user then it should be labeled as a new user tag and so on.  

So now to experiment this workflow we wanted a way which will handle a lot of incoming requests and process each message with a specific check. As we wanted to give a better experience to our users the performance should not be affected with any of this additional check on the messages. Here we started looking at the GenStage implementation. 

You can read more about the Gen stage on their documentation but in brief GenStage is an abstraction layer built on top of GenServer which provides a way to create the Producer/Consumer architecture. It is also a very nice API to manage buffer overflow issues.

As our incoming request can be a very high number we created a Genstage producer that will handle all the incoming messages in a queue and handle the demand to remove the buffer overflow issues. So now we just need to sanitize all the params we received in our request and send them to our producer. 

The producer will analyze the demand and wait for consumers to ask for the incoming messages. Now we will add a tagger consumer which will add tags based on the intent. 

Now we set up some consumers where each consumer will check for a specific intent and send the response. And as you already guessed all the consumers will run parallely. 


Now we can just create new consumers and write a logic to label a message with a tag. We are still figuring out how we can manage the same state between multiple consumers. 

So far as you can see that there are a lot of things happening for a single request. To make it a bit faster we just take the params from the request, close that socket and pass the message to a new process. To apply this logic we are using the Oban library for elixir.

The next thing we need to finalize with this is the testing, how we ran the test cases for genstage producer and consumers in a Elixir way.

I will talk about them in my future articles.