Building a 311 Data Pipeline - Study Notes from Data Engineering with Python Ch 6
The previous chapters taught you the individual tools. Python, NiFi, Airflow, databases, data cleaning. Chapter 6 of Data Engineering with Python by Paul Crickard puts them all together into one real project.
The project is a data pipeline that pulls 311-style city service data from the SeeClickFix API, transforms it, loads it into Elasticsearch, and visualizes it with Kibana. It runs every 8 hours. Crickard actually uses this pipeline himself to monitor quality-of-life issues in his county: graffiti, abandoned vehicles, needles, even complaints about people not social distancing during COVID.
This is the first “build a real thing from scratch” chapter. And it is a good one.
What Is SeeClickFix?
SeeClickFix is a platform where residents report issues to their city. Potholes, graffiti, streetlight outages, that kind of thing. Cities use it as a 311 system. It has a public REST API, which makes it a perfect data source for a pipeline project.
The API returns paginated JSON. Each page has up to 100 issues, plus metadata telling you the current page number and total number of pages. This pagination detail matters a lot for how the pipeline is built.
The Pipeline Architecture
The complete pipeline has a few key parts:
- Trigger the flow with a GenerateFlowFile processor
- Query the SeeClickFix API using a Python ExecuteScript processor
- Split the JSON into individual issues
- Add geospatial coordinates in a format Elasticsearch understands
- Extract a unique ID from each issue
- Load into Elasticsearch using upsert
- Loop through all pages using a self-referencing processor
There is also a backfill path for pulling archived data. More on that later.
Step 1: Prepare Elasticsearch
Before building the pipeline, you need to set up your Elasticsearch index. The reason is geospatial data.
Elasticsearch does not automatically know that a pair of numbers represents a location on Earth. You have to tell it. Using Kibana’s Dev Tools, you create an index called “scf” and map a field called “coords” as a geopoint data type.
If you skip this step and just let Elasticsearch auto-detect the field, it will treat coordinates as plain numbers. No map visualizations, no spatial queries. Do this first.
Step 2: Triggering the Pipeline
Here is a problem with NiFi. Processors like ExecuteScript and the HTTP processors need an incoming flowfile to start. They cannot be the first processor in a pipeline.
The workaround is the GenerateFlowFile processor. It creates an empty flowfile with 0 bytes of data. The contents do not matter. What matters is that it creates a flowfile that triggers the next processor.
You set the scheduling to whatever interval you want. Crickard runs his every 8 hours to avoid hammering the API. The GenerateFlowFile processor sends that empty flowfile downstream, and the real work begins.
Step 3: Querying the API with Python in NiFi
This is where Crickard introduces Python scripting inside NiFi. Previous chapters used NiFi’s built-in configuration options. Now you write actual code using the ExecuteScript processor.
NiFi uses Jython (Python running on the JVM), so there are some Java-specific imports you always need. The basic template for any Python ExecuteScript processor has three parts:
- Imports: Java IO libraries plus the StreamCallback class
- A class with a process function: This is where your actual logic goes
- Boilerplate session handling: Check for a flowfile, write output, route to success or failure
The actual API call is standard Python. You build a params dictionary with the city name and page size, make an HTTP request, parse the JSON response, and write it to the output stream.
The key detail: the response contains both the actual issues (an array of 100 records) and metadata about pagination (current page, total pages, next page URL). Both pieces matter for the pipeline.
Step 4: Splitting and Transforming
The API returns 100 issues in one JSON blob. You need to split them into individual records before loading into Elasticsearch.
SplitJson processor: Set the JsonPath Expression to $.issues and NiFi will take the array of 100 issues and create 100 separate flowfiles. Each one is now a single issue.
Add coordinates: Each issue has separate lat and lng fields. Elasticsearch expects geopoints as a single string like “35.0844,-106.6504”. An ExecuteScript processor reads each issue, concatenates the lat and lng into a new “coords” field, and also extracts the date from the timestamp into a clean “opendate” field.
Extract the ID: Each issue has a unique ID in the JSON. The EvaluateJsonPath processor extracts it and adds it as a flowfile attribute. This becomes the document ID in Elasticsearch.
Step 5: Loading into Elasticsearch
The PutElasticsearchHttp processor handles the load. The configuration is straightforward:
- URL: localhost:9200
- Identifier Attribute: the ID you extracted
- Index: scf
- Type: doc
- Index Operation: upsert
The upsert operation is the smart choice here. If the document does not exist, it gets created. If it exists and the data changed, it gets updated. If it exists and nothing changed, nothing happens. This means you can run the pipeline repeatedly without creating duplicates. That is exactly what you want for a pipeline that runs every 8 hours.
After this step, you have 100 documents in Elasticsearch. But there are thousands more. The API returned 44 pages of results for Bernalillo County. You need to get the other 43 pages.
Step 6: Looping Through All Pages
This is the clever part of the pipeline design.
Remember, the initial query goes to two paths simultaneously. One path is the SplitJson path we just walked through. The other goes to a GetEveryPage processor.
The GetEveryPage processor is another ExecuteScript. It reads the metadata from the flowfile, checks if the current page is less than the total number of pages, and if so, calls the next page URL from the metadata. It writes the new response as a flowfile.
Here is the trick: this processor connects to itself for the success relationship. NiFi allows that. So the processor calls page 2, outputs a flowfile, and that flowfile loops back as input. Now it calls page 3, then page 4, all the way to page 44. When the current page exceeds the total, it stops.
Each page also gets sent to the SplitJson processor, so all issues go through the same transform-and-load path.
It is a simple loop, but it handles the entire pagination problem without any external orchestration.
Step 7: Backfilling Historical Data
The regular pipeline grabs current issues. But SeeClickFix also has an archive of closed issues. If you want the full history, you need to backfill.
The approach is elegant. Copy the QuerySCF processor, rename it to QuerySCFArchive, and add one parameter: 'status': 'Archived'. Connect it to the same SplitJson and GetEveryPage processors.
Run it once. It will pull all archived pages through the same transform-and-load pipeline. Once it finishes, stop the archive processor. You now have both current and historical data in Elasticsearch.
Crickard makes a good point here. This backfill pattern is common in data engineering. You build the pipeline for new data first, make sure it works, then add a backfill path to catch up on history. You also keep the backfill capability around. If your data warehouse gets corrupted or you spin up a new one, you can re-run the backfill to rebuild everything.
Building a Kibana Dashboard
The second half of the chapter covers visualization. Once your data is in Elasticsearch, Kibana can turn it into something useful.
Setting up the index pattern: In Kibana’s management section, create an index pattern for “scf*”. Select “created_at” as the time filter field. This lets you use time-based filtering across all visualizations.
Visualizations Crickard builds:
- Bar chart: Issues by month using a Date Histogram aggregation on created_at. Shows trends over time.
- Metric: A simple count of total issues. Useful because it updates when you apply filters, so you always know the size of your current selection.
- Pie chart (donut): Top 5 issue types using a Terms aggregation on the request type field. Shows what people complain about most.
- Markdown panel: A text panel with a description of the dashboard. Not a visualization, but adds context.
- Map: Uses the geopoint data you set up at the beginning. Shows where issues are located geographically.
The dashboard: You combine all these visualizations into a single dashboard. Drag, resize, rename panels. The real power is in filtering.
Filtering examples:
- Click “Graffiti” in the pie chart and the entire dashboard updates to show only graffiti issues. The map shows graffiti locations, the bar chart shows graffiti over time, and the metric shows the count.
- Set the time filter to last 7 days. Everything updates to show only recent data.
- Draw a rectangle on the map to filter by geography. Only issues inside that area show up across all visualizations.
These filters work across all panels simultaneously. That is what makes Kibana dashboards useful for analysts and managers who do not write code.
Key Takeaways
- GenerateFlowFile is the trigger trick. When your first real processor needs an incoming flowfile, use GenerateFlowFile to create an empty one. It is a common NiFi pattern.
- Python in NiFi uses Jython. You get standard Python libraries but need Java-specific imports for IO. The boilerplate is always the same, so you just change the process function.
- Self-referencing processors handle pagination. Connecting a processor to itself creates a loop. Combined with page metadata, it processes every page without external logic.
- Upsert prevents duplicates. When your pipeline runs on a schedule, upsert ensures you do not create duplicate records. It inserts new data and updates changed data.
- Backfill is a separate path, same pipeline. Copy your query processor, add a status filter, connect it to the same downstream processors. Run once, then stop it.
- Map your geospatial fields before loading data. Elasticsearch needs to know a field is a geopoint before data arrives. Set up the index mapping first.
- Dashboards validate your pipeline. Building a quick Kibana dashboard is not just for analysts. It lets you visually check whether your pipeline is working correctly and the data makes sense.
My Take
This is the chapter where the book clicks. The previous chapters taught tools in isolation. This one shows how they fit together in a real project with a real API, real pagination, real geospatial data, and a real dashboard.
The self-referencing processor pattern for pagination is particularly useful. It is one of those things that seems obvious once you see it, but you might not think of it on your own.
One thing I would add: error handling for API failures. The code uses a basic try/except that catches everything, but in production you want to handle specific failures differently. What if the API returns a 429 (rate limit)? What if it times out? What if one page fails halfway through? Those are production concerns, and Crickard hints that the next chapter covers exactly that.
The Kibana section is a nice bonus. Most data engineers think their job ends when the data lands in the warehouse. But being able to quickly spin up a dashboard is a valuable skill, especially for validating your own work. If 100% of your issues are showing up in Antarctica on the map, you know something went wrong with your coordinate transformation.