Simple Data Pipeline- Go naive approach

ANUPAM GOGOI
4 min readFeb 28, 2022

Introduction

In all my previous posts I have explained all the basic tools needed to create a pipeline in Go lang. Now, I wished to put them all together in a proof of concept project, and below is the output of the effort.

Simple Data Pipeline

We are going to discuss a very simple data pipeline. Below things are happening in the pipeline:

A file is provided to the pipeline along with some keywords to search in it.

The pipeline will process the file and for each search keyword, it will generate a separate output file containing text (lines) containing the keyword.

Pretty simple. But, it can be extended to a massive pipeline. Just a matter of time & effort.

High-Level Architecture

Some important keywords that are used in the implementation.

Stages

The pipeline has been created with stages. Each stage is a data processing unit. Normally, each stage contains one processor but it’s not limited to just one. More than one processor can be added to a single stage.

Processors

Processors are the core part of the pipeline. The actual data processing is done by the processors. The processors can spawn multiple worker goroutines to process the load concurrently.

There are two stages for this application. Both of the stages communicate between them using a channel. Stage1 takes a file as input and its processor (Ingest) reads the file concurrently by spawning multiple worker goroutines. The number of goroutines is calculated based on the size of the input file and the size of the chunk for reading. The file data read by the goroutines are sent concurrently to the data channel.

Now, in Stage2, we have another processor called Transform that is running multiple worker goroutines. They are listening to the data in the data channel and concurrently processing them. The processing logic is to search for some keywords proved to the pipeline. Based on each keyword it will generate the respective file as output and append text (lines) that contains the particular keyword.

Source Code

The complete source code of the application can be found here.

TODO stuffs

Currently, the error handling part is not yet implemented. Let's say that while processing data in Stage2 for some permission-related issues the output files can not be generated. This should immediately inform the go workers running in Stage1. Otherwise, it will consume the memory of the system.

This kind of handling can be done using Context and I have let this part to be done later as my focus was on the POC only.

Load Testing

This is the most exciting part. Performed load testing to process a file of size 132MB. Below are the settings.

  1. Stage1: The processor (Ingest) in Stage1 calculates the number of workers automatically to read the input file. For this scenario, it calculated 13 workers.
  2. Stage2: For Stage2 processor (Transform), I have assigned 20 workers

After running the application we can see the below output.

It took 171ms to process the 132MB file to search for a keyword. It's impressive!

Conclusion

In this article, I have explained how to use Go's inbuilt concurrency handling to create a small data pipeline. I can see immense possibilities to extend this pipeline. Does this pipeline click something in your mind? Does not it seem like a flow in Mulesoft ESB or WSO2 EI? The main purpose of an ESB is to carry a payload and modifying the payload while on the go. So, what we are doing in the pipeline is almost the same, we are working on the data provided to the pipeline. Mulesoft uses MuleContext across the whole application to provide access to application-specific data, the same can be done with the StageContext that is implemented in this application.

So, can the Go pipeline replace MuleESB or WSO2 EI? Well, nothing is impossible. It's a matter of time & effort. In a previous project, I implemented an MVP for an ESB. The concepts are the same in fact :).

Happy learning!!!

--

--