My First DAG or If Your Dreams Don't Scare You
I just came back from visiting family in Tanzania and I feel that the big themes in my trip were courage and fearlessness. I think of these as two sides of the same coin. Without both sides, the value of the coin, in my opinion, depreciates. I believe one can be courageous, but be ruled by fear; and similarly one can be fearless but lack gall. I came back to work with these two themes in mind. I wanted to be bold in my work this year because if your dreams don’t scare you, they are not big enough.
My First DAG
I finally wrote my first Airflow DAG and it runs and it’s in production! I don’t really use it (yet), but it was a good stepping stone. All the DAG did was query to find a difference between two dated tables, the latest run and the one prior. It ended up being good practice because it reinforced how execution dates and schedule intervals work on Airflow (The documentation on this behavior is great across the site actually, if you happen to find/run into it. They often note how something will run up until this date, but not this date, etc.). During testing I was always triggering the DAG manually which had very specific behavior on my templated variables. For example, our latest table is always dated as of yesterday. So when I triggered the DAG to run I was actually inadvertently sending it the
execution_date of today. In my code, I compensated for this by always subtracting 1 day and then for the second_to_latest_table, I would subtract 2 days. In practice and production, yesterday was actually the execution_date and so the builtin variables of
yesterday_ds were all I needed. It was a learning moment for me and it continues to pay dividends. I recently learned that in the future, I could mimic execution dates by using the command line tool:
airflow trigger_dag dag_id -e execution_date. I’m not aware of this being available through the UI, but it would be useful!
My Second (and maybe Third?) DAG
Most of this post is actually going to be about some key aspects of the process I went through to write my second DAG. In my previous post, I talked about being inspired to start this journey and this DAG is what has been running in my imagination since that car ride to Los Angeles. It’s the inspiration for real. There was not much on paper back then. I didn’t really understand Airflow either (still don’t!) so I wasn’t sure what the limits to my imagination were going to be. At this point, I had run a previous version of this data submission process using Jupyter Notebooks written by a colleague. For this year, I transitioned those notebooks to using
papermill, further automating and reducing the number of notebook cells that a human would have to run. Prior to using papermill, if you had asked me about the feasibility of putting this entire pipeline on Airflow, I would think it’s a cool idea but I would not have believed that it was possible. Certainly, I would think that my skillset wasn’t ready to handle something that big yet. I take this space to affirm – for myself if no one else – that progress is an incremental process. Mayuko recently said it more succinctly: patient progress builds! Dreams, however, are monumental; they don’t often come to us with outlined steps to realizing those dreams. It’s precisely that scariness, though, that confirms that I have given my imagination ample room to grow. In the process of growing into that imagination, I will find pieces of myself: the courage it took to start the journey and the strength it takes to reject fear along the way.
I had a meeting with my manager in which I discuss the progress I made on a SMART Goal since our last meeting. I recount one amusing encounter here. I was describing, with all the confidence in the world that this was a novel breakthrough, how I was thinking about developing this DAG in three parts. I finished the part about downloading the files from the Student Information System and I was working on 2 other parts: the part where I fix the data up a little bit and the final part would be sending it to the SFTP server. My manager asked: “do you know what you just described?” and I, beaming, “uh wrangling??”. “ETL”. And I couldn’t help but chuckle. The whole time I was describing what my plan was, it did not occur to me that I was describing an ETL process. I have to sometimes remind myself that I do not know what all the technical things are called. However, just because I’m not calling it ETL doesn’t mean I’m not building experience in building ETL processes. It’s less of an issue here because I happen to know what ETL is and means, but that’s important as I try to think about designing a framework for building this DAG without always knowing the technical lingo.
Frameworks or Design Patterns?
I often think about the work I’m doing as a “framework”. I’m building something that gets the job done for now. I’m trying to structure it in a way that keeps the code organized, but I never know that it has a “name” per se. I think I’ve come to settle on that I should focus on building a functional tool for now and in a second or third revision, I can start thinking about optimizing or explicitly using a design pattern. Premature optimization is often a topic of conversation in programmer circles, but I figure I would add my 2 cents here. I feel drawn to try optimize or pick the best structure for the code but it often hinders my progress more than anything. One decision that I’m pretty excited about and look forward to refining in the future is how I have used
classes in the current design.
We have 18 txt files that we have to submit as zip documents to the SFTP server. There is some wrangling that all 18 files have to go through, but there are some things that might be unique to some files.
- To solve for this, I created a base class with the universal functionality and particularly one method called
process_the_file. All subclasses, or all files, are expected to override this method.
- Every subclass can have additional methods that are unique for their file, but they must be set up to be called within the
- At the DAG level, I use the
PythonOperatorto bring it all together. I create an additional class which has the list of all these 18 files or subclasses as an attribute and a callable that executes the
process_the_filemethod for each one.
There is an additional problem. Year to year, we sometimes run into an issue that can’t be solved at the source. For example, the Student Information System might produce an error in a file because our setup is unique. For these instances, we need to intervene directly on the file. When we were using Jupyter Notebooks, this process felt “seamless” in that it didn’t look different from any of the other file processing steps we were doing. However, if this is a DAG that is meant to be used year to year, we needed a way to “plug in” these manual changes and unplug when it’s a different year. For now, I rely on a class used for one-off changes. It has a main method that will execute all one-off changes. As additional one-off changes need to be made, they are added as methods and added to the method that will be executed. That executed method is then the callable for the PythonOperator.
At this point, the DAG is filling up but I still have the issue of validations to consider. To rehash: we need to check that the files produced at the end match the expectations of the state system and that they match the expectations of our organization. As I was writing validations, I ran into two problems:
- Why am I writing validations for the state system? The state system will list out everything that is wrong with the files that you produced and has a “reject this data” feature if there are too many errors.
- To validate the data against what the organization expects would require querying the database. This is possible but complicates the DAG.
For the first iteration, I’m not going to write validations against the state system requirements. I’m duplicating work and the system finds few errors if our Student Information System is set up as cleanly as possible. Unlike the first issue, no system exists already to check that the data matches our organizational expectations. To that end, this tool needs to be built, but likely as a separate DAG (the third DAG). There are two reasons for my thinking here:
- No matter how bad the data is, I still want it to be sent to the state system so that I can get the errors that the state system produces. This is to say that these validations can happen after the submission DAG completes.
- I need to access both our database and the file’s data at the same time. This sounds like a great opportunity to write a general operator that takes a query and the file identifier to do the validations.
There’s no shortage of storage solutions and with the flexibility of Airflow and compute services, the world’s your oyster for storage. Our Airflow production server had two major storage components: S3 and Snowflake. The concern I had with these tools is that our organization operated primarily out of Google Drive. I originally argued that storing these zip documents in the Drive would make sense, but my team convinced me that since the infrastructure for S3 was up and running, it would be best to download from S3. When I start considering where to put and how to format validation outputs, they will also be stored in S3 or Snowflake.
Conclusion / Meta Discussion
The structure of this post was less smooth than I would have liked. However, I think the best way to view these posts is more of highlights of the conversations or of the processes that I go through to reimagine state reporting. I’ve been thinking about doing a presentation on these things one day and maybe that’s when the synthesis will be most key. I’m also still finding my writer’s voice after so long. For this conclusion, I want to look at what we set out to accomplish with this DAG and what we have accomplished.
Set Out to Accomplish
- Automate the mechanics of submitting files to the state system
- Make the process reliable/robust (error-handling, etc.)
- Sending valid and trusted data
- Automated the mechanics (computer takes 8-10 minutes daily, previous process would need at least 15 mins block of human time and was done once a month)
- Designed the codebase to be re-usable and easily editable for annual one-off fixes
- Planned to add a validation layer as a separate DAG