Building a Production Data Pipeline - Study Notes from Data Engineering with Python Ch 11

You learned the individual tools. You learned the deployment strategies. Now Chapter 11 of Data Engineering with Python by Paul Crickard puts it all together. This is the chapter where you build a complete, production-grade data pipeline from start to finish.

The goal: read files from a data lake, load them into a staging table, validate the data, and then move it into a data warehouse. Along the way, you use version control, monitoring counters, environment variables, and Great Expectations for validation. Everything you learned in the previous chapters comes together here.

Setting Up Test and Production Environments

Before building anything, you need two environments: test and production. Crickard uses PostgreSQL for both, managed through PgAdmin4.

Each environment gets two tables:

  • staging for holding incoming data before validation
  • warehouse for the final, validated data

So in total you create four tables. Two in the test database, two in the production database. All four tables have the same structure: userid, name, age, street, city, state, and zip.

A handy shortcut: create the staging table manually in PgAdmin4 once, then use the “CREATE Script” feature to generate the SQL. Change the table name from “staging” to “warehouse” and run it. Same trick works when you set up the production database. No need to click through the UI four times.

Creating a Simulated Data Lake

A data lake is just a place where files live on disk. In the real world, this would probably be HDFS or something similar. For this exercise, Crickard keeps it simple: a folder on your machine.

You populate it with Python and the Faker library. The script generates 1,000 JSON files, each representing a fake person with a userid, name, age, and address fields. Each file is named after the person. The userid increments with every file so it can serve as a primary key.

Nothing fancy here. The point is to have a realistic set of source files for the pipeline to consume.

The Pipeline Architecture

The finished pipeline has five processor groups, each handling one step:

  1. ReadDataLake - picks up files from the data lake
  2. ScanLake - optional monitoring for specific records
  3. InsertStaging - loads data into the staging table
  4. QueryStaging - validates record counts
  5. ValidateStaging - runs Great Expectations checks
  6. InsertWarehouse - moves validated data to the warehouse

Each group is self-contained. Each one gets version controlled separately in NiFi Registry. This means you can reuse any group in a different pipeline without rebuilding it.

Reading the Data Lake

The first processor group uses three processors and an output port.

GetFile reads JSON files from the data lake folder. You set the input directory, keep “Keep Source File” set to true (so the originals stay put), and add a regex file filter to only grab JSON files. If you skip the filter and there are non-JSON files in the folder, NiFi will try to process them and fail.

EvaluateJsonPath extracts the fields from each JSON file and puts them into flowfile attributes. You create a property for each field: userid, name, age, street, city, state, zip. The destination is set to “flowfile-attribute” so downstream processors can access these values.

UpdateCounter is purely for monitoring. It counts every flowfile that passes through. You give the counter a name like “datalakerecordsprocessed” and set the delta to 1. This does not change your data at all. It just gives you a running tally you can check later to verify everything got processed.

After building this group, you version control it in NiFi Registry. Create a bucket (Crickard uses one called “DataLake”), save the flow, and you are done. This group is now reusable. Any pipeline that needs to read from the data lake can import it.

Scanning the Data Lake (Bonus Feature)

Here is a neat addition. Suppose another department needs to watch the data lake for VIP customers. Instead of building a whole new pipeline, you just attach another processor group to the ReadDataLake output.

The ScanLake group uses a ScanContent processor. You give it a dictionary file containing names you want to watch for. When a file comes through that matches a name in the dictionary, the processor routes it to a PutSlack processor. Everyone else flows through normally.

This is a good example of why breaking pipelines into processor groups matters. You did not have to modify the ReadDataLake group at all. You just plugged in a new consumer.

Inserting Data Into Staging

This group is straightforward. One PutSQL processor does all the work.

The SQL statement uses flowfile attributes to build the INSERT query. But here is the interesting part: the table name is not hardcoded. It uses a NiFi variable called ${table}, set to “staging” in the test environment.

Why does that matter? Because when you deploy to production, you just change the variable value. The processor stays the same.

You also configure a JDBC connection pool for PostgreSQL. And there are two important settings: batch size (how many records to process at once) and rollback on failure. Setting rollback on failure to true gives you atomicity. If one record in a batch fails, everything rolls back. No partial inserts.

Another UpdateCounter tracks how many records got inserted. You can compare this counter to the one from the ReadDataLake group. If they match, every file made it into staging.

Querying the Staging Database

Before moving data to the warehouse, you want to verify it actually landed in staging correctly. This group does a simple validation: count the records and check the number.

ExecuteSQLRecord runs a select count(*) from ${table} query against the staging table. It returns a JSON object with the count. An EvaluateJsonPath processor extracts the count as an attribute called “recordcount.”

Then RouteOnAttribute checks whether the count meets your threshold. Crickard uses a NiFi expression that checks if the record count is greater than or equal to 1,000:

${recordcount:ge(1000)}

If yes, the flowfile moves on. If not, it gets routed elsewhere (or terminated). This is a basic sanity check. You generated 1,000 files, so you expect 1,000 records in staging. If the count is lower, something went wrong.

You could add more validation queries here. If you have data analysts on your team, ask them what queries would help verify the data looks right.

Validating With Great Expectations

The RouteOnAttribute check is useful but limited. For more thorough validation, Crickard brings back Great Expectations, which was introduced in Chapter 7.

The setup process:

  1. Create a project folder and run great_expectations init
  2. Choose “Relational database (SQL)” and configure it to connect to your PostgreSQL staging table
  3. Great Expectations auto-generates a validation suite based on the table structure
  4. Edit the suite in a Jupyter notebook to remove overly strict rules (like expecting exactly zero rows)
  5. Generate a “tap” file, which is a standalone Python script that runs the validation

In NiFi, the ExecuteStreamCommand processor runs the tap script. The script outputs a JSON result indicating pass or fail. An EvaluateJsonPath processor extracts the result, and a RouteOnAttribute processor checks if it starts with “pass.”

If validation passes, the flowfile moves to the final step. If it fails, you stop and investigate. No bad data gets into the warehouse.

Moving Data to the Warehouse

The last processor group reads everything from staging and inserts it into the warehouse.

An ExecuteSQLRecord processor runs select * from ${table} against staging. The record writer is set to JsonRecordSetWriter with “One Line per Object” output grouping. A SplitText processor breaks the results into individual records. An EvaluateJsonPath processor extracts the fields (same configuration as the ReadDataLake group).

Finally, a PutSQL processor inserts each record into the warehouse table. The SQL statement is the same INSERT query used for staging, but the table variable points to “warehouse” instead.

This is the end of the pipeline. All relationships on the final PutSQL processor are terminated.

Deploying to Production

Deployment follows the strategy from Chapter 10. The steps are quick:

  1. Open your production NiFi instance
  2. Import the processor groups from the registry (use the latest version of each)
  3. Update the variables on each group to point to the production database

That is it. The table names stay the same (staging and warehouse). Only the database connection changes. Start all the processor groups, and data flows through the production pipeline.

Key Takeaways

  • Separate test and production environments. Same table structure, different databases. Variables handle the differences.
  • Break pipelines into processor groups. Each group does one job. Version control each one independently. Reuse them across pipelines.
  • Monitor with counters. UpdateCounter processors give you a running count of records processed. Compare counts at different stages to spot data loss.
  • Validate before loading. Use staging tables and Great Expectations to catch problems before they reach the warehouse.
  • Use NiFi variables for configuration. Never hardcode table names or connection strings. Variables make the same pipeline work across environments.
  • Atomicity matters. Rollback on failure prevents partial inserts from leaving your database in a bad state.

My Take

This chapter is the payoff for the entire second section of the book. Every concept from Chapters 7 through 10 (staging, validation, version control, deployment strategies, monitoring) shows up in this pipeline. It is satisfying to see them come together in a real project.

The pipeline itself is not complicated. Read files, insert into staging, validate, move to warehouse. But that simplicity is the point. The value is not in the complexity of the data flow. It is in the production features wrapped around it. Counters for monitoring. Great Expectations for validation. Variables for environment management. Rollback for atomicity. Processor groups for reuse and version control.

If you stripped all of that away, you could build this pipeline with a handful of processors. But it would break in production. And when it broke, you would have no idea where things went wrong or how to fix it.

The ScanLake example is a nice touch. It shows that well-structured pipelines are extensible. When another team needs something from your data, you do not rebuild. You plug in.

One thing that is missing: error handling paths. The pipeline validates data, but what happens when validation fails? In a real production system, you would want alerts, retry logic, and dead letter queues for records that cannot be processed. Crickard acknowledges that you could add more validation, but the chapter stops short of building a full error recovery system.

Still, this is a solid capstone chapter for the batch processing section. Next up: the book moves into real-time data with Apache Kafka.


About

About BookGrill.net

BookGrill.net is a technology book review site for developers, engineers, and anyone who builds things with code. We cover books on software engineering, AI and machine learning, cybersecurity, systems design, and the culture of technology.

Know More