Stream processing with Python Faust: Part II – Streaming pipeline

· 1699 words · 8 minute read

This blog post is the second part of a series of posts on how to setup a Stream processing pipeline with Python and the Faust library. If you haven’t read the first part on the general concepts and you’re not familiar with Faust, I strongly recommend that you check it out.

Today, we’ll setup a simple processing pipeline consisting of a couple of stages. This is a common use case for Kafka and a fun way to explore Faust.

OK, now let’s get our hands dirty!

Project layout 🔗

One of the most frustrating things when starting a new project and learning a new technology is to figure out the best way to setup your directory layout. It’s perfectly fine to start your Faust project in a single Python module but if you’re planning to go anywhere beyond a single agent processing, it’s probably better to start structuring your project.

Faust provides a suggested layout for medium/large projects. This approach suggests that you’re going to distribute your project as a pip installable library. We’re not going to do that but we’ll still reuse most of the suggested directory structure.

Here is the layout that we are going to implement

+ pipeline/
    + pipeline/
        - __init__.py
        - __main__.py
        - app.py
        + fetcher/
            - __init__.py
            - agents.py
            - models.py
        + normaliser/
            - __init__.py
            - agents.py
            - models.py
    + requirements/
        - base.txt
        - prod.txt
        - test.txt
    + tests/
        - __init__.py
        - test_normaliser.py
    - Dockerfile
    - docker-compose.yaml

The top level pipeline folder is your project root and holds all the other files and directories. We are not going to describe all the standard files and directories used for a dockerised Python app. The main directory we are interested in today is the nested pipeline/ app directory.

Let’s start by creating the project.

mkdir pipeline && cd pipeline/

While we are there we’ll create the Docker and docker-compose files

touch Dockerfile && \
touch docker-compose.yaml

And now we’re creating the nested pipeline app directory from within the pipeline project directory, and the top level directories

mkdir pipeline && \
mkdir requirements && \
mkdir tests

Good, we have the top level directories ready.

Let’s add our requirements before getting onto Faust itself.

printf "%s\n" "faust==1.10.4" "pytz==2021.1" "requests==2.25.1" > requirements/base.txt && \
printf "%s\n" "-r base.txt" > requirements/prod.txt && \
printf "%s\n" "-r base.txt" "pytest==6.2.3" > requirements/test.txt

We can now move into our main app directory.

cd pipeline/

Faust entry point 🔗

Now that we are in our main app directory (pipeline/pipeline/), we’ll create the top level files for our Faust project

touch __init__.py && \
touch __main__.py && \
touch app.py

We’re also going to create the top level directories for our pipeline services

mkdir fetcher && \
mkdir normaliser

We can now create the Faust app in app.py

# app.py
import faust


VERSION = 1

PROJECT = "pipeline"  # our root directory
ORIGIN = "pipeline"  # our app directory

AUTODISCOVER = [
    f"{ORIGIN}.fetcher",
    f"{ORIGIN}.normaliser",
]

BROKER = "kafka://kafka:9092"

app = faust.App(
    PROJECT,
    version=VERSION,
    autodiscover=AUTODISCOVER,
    origin=ORIGIN,
    broker=BROKER,
)


def main() -> None:
    # HACK:
    # Waiting for kafka to be ready.
    # In production, you would add a wait script to your docker-compose
    # to make sure that kafka is ready when Faust starts.
    import time; time.sleep(10)

    app.main()

We are done with the basic setup for the Faust application and we’ll be able to use it in the __main__.py file in a second.

Most of what we’ve done here is to instantiate the application using most of the standard parameters. Two main things to note:

  • We’ve used the autodiscovery but it’s not strictly required.
  • One of the cool things of Faust is its integration with RocksDB but we haven’t used any store here.

OK, we can now import this from our __main__.py file and that’s going to be our project entry point.

# __main__.py
from .app import main


main()

The basic scaffolding is there, we’ll step back for a few minutes to add the Docker setup.

Docker and docker-compose setup 🔗

Move one level up to the root project directory and copy the following in the Dockerfile

FROM python:3.8.6

WORKDIR /app

COPY requirements/*.txt /app/requirements/
RUN pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r requirements/prod.txt

COPY pipeline/*.py /app/pipeline/
COPY pipeline/fetcher/ /app/pipeline/fetcher/
COPY pipeline/normaliser/ /app/pipeline/normaliser/

ENTRYPOINT ["python", "-m", "pipeline", "worker", "-l", "info"]

We’ll now use the docker-compose file to start Zookeeper, Kafka and Faust services.

You can use the following configuration for that purpose.

version: "3"
services:
  zookeeper:
    image: "wurstmeister/zookeeper:latest"
    ports:
      - 2181:2181
  kafka:
    image: "wurstmeister/kafka:latest"
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    volumes:
      - /kafka
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - ALLOW_PLAINTEXT_LISTENER=yes
  faust:
    build: .
    depends_on:
      - kafka
    volumes:
      - ./:/app/

If you are not familiar with Docker and docker-compose, you can read through our series of tutorials. It’s a pretty basic setup for Kafka (which depends on Zookeeper) and Faust that builds using the Dockefile defined above.

Pipeline services: Faust agents 🔗

We’re now going to create the actual streaming pipeline logic. The basic idea of our pipeline is:

  1. a fetcher service that is going to regularly fetch data from an external API (https://randomuser.me/api/ for the sake of this example)
  2. a normaliser service that is responsible for normalising the data to the expected output topic, filtering out unwanted data, renaming fields, etc.

The streams processors in Faust are called agents and these are the key pieces of our streaming pipeline. Before we can create these, we need to define the input and output topics and the models used for these.

Fetcher 🔗

If you’re not on the app pipeline directory, please move into it. From there we’ll create the fetcher folder and its associated files.

cd fetcher && \
touch __init__.py && \
touch agents.py && \
touch models.py

OK, we’ve created all the files and are now in the fetcher directory. We’ll start by defining our topic and our models. In the models.py file, we are going to add the following.

# fetcher/models.py
import faust

from pipeline.app import app


class RawUser(faust.Record):
    id: dict
    gender: str
    name: dict
    location: dict
    email: str
    login: dict
    dob: dict
    registered: dict
    phone: str
    cell: str
    picture: dict
    nat: str


output_schema = faust.Schema(
    key_type=str,
    value_type=RawUser,
    key_serializer="json",
    value_serializer="json",
)

output_topic = app.topic("fetcher-0.0.1", schema=output_schema)

There are a few things there but nothing very complicated. The first thing we did was to define the output model, UserRecord, which is basically the value type for what our fetcher service produces. Then we’ve defined a Faust Schema, which defines the key and value types and the serializers. This schema is then used to define the output topic of our service, along with the topic name.

Once this models setup is ready, we can define a crontab that is going to regularly fetch our target API and produce raw users to our output topic.

# fetcher/agents.py
import logging

import pytz
import requests

from pipeline.app import app
from .models import output_topic


logger = logging.getLogger(__name__)


@app.crontab("* * * * *", timezone=pytz.timezone("US/Eastern"), on_leader=True)
async def fetch():
    response = requests.get("https://randomuser.me/api/?results=50")
    response.raise_for_status()

    data = response.json()
    for result in data["results"]:
        key = result["id"]["value"]
        if not key:
            # randomuser.me has some some users with None or empty ID value,
            # we don't want to process these.
            continue

        logger.info("Fetched user with ID %(user_id)s", {"user_id": key})

        await output_topic.send(key=key, value=result)

We have setup our initial service for fetching data from an external API. This service fetches the API every minute on a cron job (the app.crontab decorator) and produces the fetched results to the output_topic (fetcher-0.0.1). We use the user.id.value as the message key.

Normaliser 🔗

To build a proper data processing pipeline, we need at least a couple of services, so let’s create our normaliser service.

# assumes you were in the fetcher/ directory
cd .. && cd normaliser && \
touch __init__.py && \
touch agents.py && \
touch models.py

As for the fetcher, we’ll start with the models module but this time we’ll need an input model and an output model.

Let’s get onto the models.py file and edit it.

# normaliser/models.py
import faust

from pipeline.app import app


class RawUser(faust.Record):
    id: dict
    gender: str
    name: dict
    location: dict
    email: str
    login: dict
    dob: dict
    registered: dict
    phone: str
    cell: str
    picture: dict
    nat: str


input_schema = faust.Schema(
    key_type=str,
    value_type=RawUser,
    key_serializer="json",
    value_serializer="json",
)

input_topic = app.topic("fetcher-0.0.1", schema=input_schema)


class NormalisedUser(faust.Record):
    id: str
    name: str
    cell: str
    email: str


output_schema = faust.Schema(
    key_type=str,
    value_type=NormalisedUser,
    key_serializer="json",
    value_serializer="json",
)

output_topic = app.topic("normaliser-0.0.1", schema=output_schema)

So, in the normaliser models - the second step of our pipeline - we now have an input and an output schema because the normaliser service does transform the data (filters out fields, modifies fields, etc.).

We can now move onto the agents code

# normaliser/agents.py
import logging

from pipeline.app import app
from .models import input_topic, output_topic


logger = logging.getLogger(__name__)


def normalise_user(raw_user):
    return {
        "id": raw_user["id"]["value"],
        "name": f"{raw_user['name']['first']} {raw_user['name']['last']}",
        "cell": raw_user["cell"],
        "email": raw_user["email"],
    }


@app.agent(input_topic)
async def consume(stream):
    async for record in stream:
        raw_user = record.asdict()

        normalised_user = normalise_user(raw_user)
        key = normalised_user["id"]

        logger.info("Normalised user with ID %(user_id)s", {"user_id": key})

        await output_topic.send(key=key, value=normalised_user)

The normalisation is quite simple here. We just filter out some fields and flatten a couple of other fields, namely id and name.

This is the first Faust agent that we actually define, using the app.agent decorator. It iterates over the stream of raw users that the fetcher services produces, normalises each individual users and then produces them to the normaliser output topic for further processing. Another approach here could have been to use a sink to perform additional actions on the processed messages.

Running the pipeline 🔗

We are pretty much done with our streaming pipeline, it is now ready to test. Go to the top level directory and run the following

docker-compose up --abort-on-container-exit

It will take a bit of time to get your different services ready and for the cron job to actually run (and the logs might be a bit verbose) but after this wait you should start seeing logs of your users being fetched and normalised!

Conclusion 🔗

This was a bit longer workout today, hope you’re abs are not burning too much… and hopefully that gave you a decent overview of Faust in the context of a simple streaming pipeline use case.

References 🔗