Working with Databases - Study Notes from Data Engineering with Python Ch 4
Most data pipelines start with a database. Most of them end with one too. Chapter 4 of Paul Crickard’s book is about connecting Python to databases and moving data between them. If the previous chapter was about flat files, this one is where things get real.
The chapter covers three things: working with PostgreSQL (relational), working with Elasticsearch (NoSQL), and building actual pipelines in both Airflow and NiFi that move data from one to the other.
PostgreSQL with Python
Setting Up
The book uses a PostgreSQL database called dataengineering with a users table. The table has columns for name, id, street, city, and zip. You create all this through pgAdmin 4, which is the web GUI for PostgreSQL.
Nothing fancy here. Standard relational database stuff. Tables with rows and columns, relationships between them. Relational databases go back to E.F. Codd’s work in the late 1970s, and the core ideas haven’t changed much.
Connecting with psycopg2
The Python library used here is psycopg2. It’s built specifically for PostgreSQL. There are other options like SQLAlchemy (which is an ORM and lets you write queries in a more Pythonic way) or pyodbc, but psycopg2 is the straightforward choice for direct database access.
Install it with pip:
pip3 install psycopg2-binary
The -binary version is precompiled and avoids dependency headaches. Worth knowing about.
The connection pattern is always the same four steps:
- Import the library
- Build a connection string with host, database name, username, and password
- Create a connection object
- Create a cursor from that connection
Once you have a cursor, you can run any SQL against the database.
Inserting Data
For inserting a single record, you write a SQL INSERT statement and pass it to the cursor’s execute() method. Here’s the thing though: after executing, the data is not actually in the database yet. You have to call commit() on the connection to make it permanent.
This trips people up. You run the insert, check pgAdmin, and see nothing. The transaction hasn’t been committed yet.
The book also shows a neat debugging tool: mogrify(). It returns the exact SQL string that would be sent to the database. Very helpful when your queries aren’t working and you need to see what’s actually being sent.
Bulk Inserts
For inserting lots of records, the book uses the Faker library to generate dummy data and executemany() instead of execute(). The approach is:
- Generate your data as a list of tuples
- Convert it to a tuple of tuples
- Use
executemany()with a parameterized query - Commit
Using executemany() is better than looping and calling execute() for each record because it handles the bulk insert in a single transaction. In the example, 1,000 fake user records get inserted this way.
One important detail: use %s placeholders in your query string instead of Python’s format() or f-strings. Let psycopg2 handle the type mapping. It’s safer and avoids SQL injection issues.
Reading Data Back
Extracting data follows the same connection pattern. The only difference is you use a SELECT statement instead of INSERT.
After executing a SELECT, the cursor becomes iterable. You can loop through it directly, or use one of the fetch methods:
fetchall()grabs everythingfetchmany(n)grabs n recordsfetchone()grabs a single record
You can also check cur.rowcount for the total number of results and cur.rownumber to see your current position in the result set.
There’s also a copy_to() method that exports an entire table straight to a CSV file. Quick and useful for data dumps.
Using pandas DataFrames
Here’s where it gets more practical. Instead of manually iterating through cursors, you can use pandas’ read_sql() method. Pass it a SQL query and a connection, and you get a DataFrame back.
From there, you have the full pandas toolkit. Export to JSON, filter, transform, whatever you need. This is the approach you’ll probably use most in real data engineering work.
Elasticsearch with Python
What Is Elasticsearch?
Elasticsearch is a NoSQL database. NoSQL is a broad term for databases that don’t use traditional rows and columns. Instead, Elasticsearch stores data as JSON documents and uses its own query language instead of SQL.
There are other types of NoSQL databases too: columnar, key-value, time-series. Elasticsearch falls into the document store category.
Connecting and Inserting
The Python library is simply called elasticsearch. Install it with pip, and the version should match your Elasticsearch server version.
Connecting is simpler than PostgreSQL. You create an Elasticsearch() instance and it defaults to localhost. To insert a document, you use the index() method and pass it an index name, document type, and a JSON body.
Here’s the key difference from relational databases: you don’t need to create the index (like a table) ahead of time. Elasticsearch creates it automatically when you first insert a document. That’s convenient but can also lead to unexpected mappings if you’re not careful.
The method returns a result object. Check the result key to see if it says “created”. If it does, your document is in.
Bulk Inserts
For loading lots of documents, the helpers.bulk() method is the way to go. You build an array of JSON objects, each containing:
_index: which index to put it in_type: the document type_source: the actual document data
Then pass the whole array to helpers.bulk() along with your Elasticsearch instance. The library handles batching for you.
You can verify the results through Kibana by creating an index pattern and viewing the documents in the Discover tab.
Querying Elasticsearch
Querying uses the search() method instead of index(). You pass it a query body as a JSON object.
The simplest query is match_all, which returns everything (up to the size limit you set). Results come back in a nested structure under res['hits']['hits'], and each document’s actual data lives in _source.
You can also search specific fields with match. For example, searching for a name or a city. But here’s the thing about Elasticsearch: it tokenizes strings. If you search for “Jamesberg” in the city field, you might also get “Lake Jamesberg” because Elasticsearch splits multi-word values into tokens.
To handle this, you use Boolean queries with must, must_not, and filter clauses. A Boolean query lets you combine multiple conditions. You can match on city while filtering by zip code to narrow down results.
There’s also Lucene syntax support, where you pass queries as simple strings like name:Ronald Goodman using the q parameter. Good for quick searches.
Handling Large Result Sets with Scroll
Elasticsearch has a maximum return size of 10,000 documents per search. If you need more than that, you use the scroll API.
The idea is simple:
- Run your search with a
scrollparameter that sets a time window (like 20 minutes) - Save the
_scroll_idfrom the response - Loop: call the
scroll()method with that ID to get the next batch - Keep going until you get zero results back
This is similar to pagination, but the scroll maintains a consistent snapshot of the data. The time window is how long Elasticsearch keeps the scroll context alive.
Building a Pipeline in Airflow
This is where the chapter ties it all together. The pipeline reads data from PostgreSQL and loads it into Elasticsearch.
The DAG Structure
The Airflow DAG has two Python operators:
- QueryPostgreSQL: connects to PostgreSQL, runs a SELECT query using pandas, and saves the results as a CSV file
- InsertDataElasticsearch: reads that CSV into a DataFrame, iterates through the rows, converts each to JSON, and indexes them into Elasticsearch
The first task is upstream, the second is downstream. Simple two-step pipeline.
Why Two Tasks Instead of One?
The book makes a good point here. You could put all the logic in one function. But you shouldn’t. Each task in a pipeline should be atomic, meaning it can stand on its own. If everything is in one function and it fails, you don’t know if the read failed or the write failed. With separate tasks, Airflow shows you exactly where things broke.
This matters more as your pipelines get complicated.
The Boilerplate
Every Airflow DAG needs:
- Library imports (DAG, operators, your custom libraries)
- Default arguments (owner, start date, retries, retry delay)
- The DAG definition with a schedule interval
- Operator definitions linking tasks to Python functions
The schedule interval can be a cron expression or a timedelta. In the example, the DAG runs every 5 minutes.
The Idempotency Problem
Here’s the problem the book flags: this pipeline is NOT idempotent. Every time it runs, it inserts the same records into Elasticsearch again with new IDs. So after a few runs, you have duplicates everywhere.
Idempotent means running something multiple times produces the same result as running it once. That’s a critical property for production pipelines. The book says it’ll address this in Section 2 (Chapter 7, Features of a Production Pipeline).
This is a good callout. It’s easy to build a pipeline that works once. Building one that works correctly on the 100th run is the real challenge.
Building the Same Pipeline in NiFi
NiFi takes a different approach. Instead of writing Python code, you drag and drop processors onto a canvas and configure them.
Three Processors
The NiFi version has three steps instead of two:
- ExecuteSQLRecord: connects to PostgreSQL and runs the SELECT query
- SplitText: takes the batch result and splits it into individual records (one flowfile per row)
- PutElasticsearchHttp: sends each record to Elasticsearch
Configuration Details
The ExecuteSQLRecord processor needs a DBCPConnectionPool service. This is where you configure your database connection using a JDBC connection string. The book has a nice tip: name the service after the database (like “dataengineering”), not after the database type (“PostgreSQL”). You’ll have multiple PostgreSQL connections eventually, and naming by type gets confusing fast.
You also need a JSONRecordSetWriter service with “Output Grouping” set to “One Line Per Object”. This is important and easy to miss.
The SplitText processor splits the output into individual flowfiles, one per line. And the PutElasticsearchHttp processor sends them to Elasticsearch at localhost:9200.
Same Idempotency Issue
Just like the Airflow version, this NiFi pipeline is not idempotent. Every run duplicates data. The fix comes later in the book.
Key Takeaways
psycopg2 for PostgreSQL: connect, cursor, execute, commit. That’s the pattern. Always commit after inserts.
Elasticsearch Python client: simpler connection model, JSON in and out. Use helpers.bulk() for large datasets. Use scroll for large result sets over 10,000 docs.
pandas is the bridge:
read_sql()for databases,read_csv()for files. DataFrames make it easy to move data between formats.Atomic tasks: keep pipeline steps separate. One task, one job. Makes debugging possible.
Idempotency matters: a pipeline that creates duplicates on every run is broken for production. This is a design problem, not a coding problem.
NiFi vs Airflow: NiFi is visual and configuration-driven. Airflow is code-driven. Both can build the same pipeline. Your team’s skills and preferences should guide the choice.
The chapter does a solid job of covering both relational and NoSQL databases in a practical way. The progression from “here’s how to connect” to “here’s a full pipeline” makes sense. The honest acknowledgment that these pipelines aren’t production-ready yet is refreshing. Too many tutorials pretend the happy path is the only path.
Next chapter covers cleaning and transforming data, which is what happens between the extract and load phases. That’s where the real data engineering work lives.