Categories
DAGs Data Engineering

How to Improve a DAG

This post is part of a collaboration between Alisa Aylward of Alisa in Techland and Jared Rand of Skillenai. View Jared’s post on Alisa in Techland here.


Discover the worst data pipeline ever and how to improve it’s DAG. Learn to remove cycles and handle dependencies efficiently.

What is a DAG?

DAG stands for directed acyclic graph. In the sphere of data engineering, you will often hear DAG thrown around as though it is synonymous with a data pipeline; it is actually a more general mathematical concept. Data pipelines fit the definition of DAG, but not all DAGs are data pipelines. Data pipelines are DAGs because they are:

  • Directed, because there is a defined, non-arbitrary order of tasks
  • Acyclic, because they never make circles since that would mean they run forever
  • Graph, because their visual representation consists of nodes and graphs

A non-math example of a DAG is family trees (Wikipedia). So DAGs exist all around us, but what makes them good or bad in the world of data pipelining?

A note on terminology: mathematically, a DAG consists of nodes (the circles) and edges (the straight lines), but in this post, I will be referring to the vertices as “tasks”. This is what they are called in Apache Airflow, and this more accurately describes their function as a “unit of work within a DAG” (Source).

What is a bad DAG?

There is no definition for what makes a bad DAG, but there are attributes of a DAG that can cause inefficiencies. Jared designed the “worst data pipeline ever” to display some of the common pipeline design mistakes:

worst data pipeline

Image credit – Worst data pipeline ever

How do we improve a bad DAG?

Cycles

A DAG can be so bad that it is not even a DAG; in this case, the data pipeline has cycles so it is not acyclic. The cycles in this pipeline are highlighted below:

worst data pipeline cycles
Image by author – Worst data pipeline, with cycles

If this pipeline were coded in an Airflow DAG file, the Airflow webserver would not render it visually nor run it. It would display the error: “Cycle detected in DAG”.

It is hard to say how to fix these cycles without seeing the data, the flow of past_acquisition -> past_acquisition should be combined into one task. The one thing I would caution about is to not solve this by copying the past_acquisition twice. That cycle has to be eliminated by entirely refactoring the code to handle the update.

The same is true for the new_visitors -> past_visitors -> new_visitors. The cycles have to be eliminated within the code.

Dependencies

Although I don’t know the data associated with this DAG, it looks like the underlying architectural issue here is one of database design. Specifically, views, visitors, and campaigns are their own entities and should have some sort of physical manifestation before code is built on top of them to produce the reporting table acquisition_today. Assuming that the acquisition_today report was the one requested by stakeholders, it is natural to want to build the DAG around it. After all, what use is a DAG if it doesn’t produce something that the stakeholders want?

We do want to produce the report, but we want to do it as efficiently and scalable as possible. The inefficiency of this dag is easiest to explain with a hypothetical situation: if there is an error in views_today and we fix it, in order to update the report, we have to re-run all downstream tasks. This impacts all but three tasks: views_raw, visitors_raw, and campaigns_raw. This is time consuming (as stakeholders wait anxiously for their data) and compute intensive. If past_visitors is mostly visitor-based data with one or two fields from views, re-running past_visitors because of a views change is inefficient.

Additionally, not having a visits or campaign entity makes it hard to debug changes to acquisition report. If a stakeholder notices the report has a sharp increase in visits, can they quickly isolate those visits for quality assurance? If the report shows 30 visits yesterday and 100 today, can they find the new visit_ids easily to look it up in the source system? When all building blocks of a report are materialized to stakeholders, they have more agency to both understand and debug the data, taking work from the data engineering team.

Lastly, history tells us that stakeholders may want more than one report off this data or a change in reports. If stakeholders request a funnel_report, using the views data but not the campaign data, this DAG does not give us the flexibility to do that. Therefore, we want to build out all the underlying entities and then combine them at the very end.

What makes a good DAG?

Below is a good dag. Notice that:

  • There are no cycles
  • All entities are materialized independently (campaigns, visitors, views, etc) before being combined into the acquisition_report
  • We are able to add several reports (funnel_report, customer_profile) from the same entities
good data pipeline

Image credit – A better data pipeline

Categories
Coding Data Science Python

Singleton Fails with Multiprocessing in Python

A singleton is a class designed to only permit a single instance. They have a bad reputation, but do have (limited) valid uses. Singletons present lots of headaches, and may throw errors when used with multiprocessing in Python. This article will explain why, and what you can do to work around it.

A Singleton in the Wild

Singleton usage is exceedingly rare in Python. I’ve been writing Python code for 5 years and never came across one until last week. Having never studied the singleton design pattern, I was perplexed by the convoluted logic. I was also frustrated by the errors that kept popping up when I was forced to use it.

The most frustrating aspect of using a singleton for me came when I tried to run some code in parallel with joblib. Inside the parallel processes, the singleton always acted like it hadn’t been instantiated yet. My parallel code only worked when I added another instantiation of the singleton inside the function called by the process. It took me a long time to figure out why.

Why Singletons Fail with Multiprocessing

The best explanation for why singletons throw errors with multiprocessing in Python is this answer from StackOverflow.

Each of your child processes runs its own instance of the Python interpreter, hence the singleton in one process doesn’t share its state with those in another process.

https://stackoverflow.com/questions/45077043/make-singleton-class-in-multiprocessing

Your singleton instance won’t be shared across processes.

Working Around Singleton Errors in Multiprocessing

There are several ways to work around this problem. Let’s start with a basic singleton class and see how a simple parallel process will fail.

import time
from joblib import Parallel, delayed
class OnlyOne:
"""Singleton Class, inspired by
https://python-3-patterns-idioms-test.readthedocs.io/en/latest/Singleton.html"""
class __OnlyOne:
def __init__(self, arg):
if arg is None:
raise ValueError("Pretend empty instantiation breaks code")
self.val = arg
def __str__(self):
return repr(self) + self.val
instance = None
def __init__(self, arg=None):
if not self.instance:
self.instance = self.__OnlyOne(arg)
else:
self.instance.val = arg
def __getattr__(self, name):
return getattr(self.instance, name)
def worker(num):
"""Single worker function to run in parallel.
Assume that this function has to do an empty
instantiation of the singleton.
"""
one = OnlyOne()
time.sleep(0.1)
one.val += num
return one.val
# Instantiate singleton
one = OnlyOne(0)
print(one.val)
# Try to run in parallel
# Will hit the ValueError that raises with
# empty instantiation
res = Parallel(n_jobs=-1, verbose=10)(
delayed(worker)(i) for i in range(10)
)
print(res)

In this example, the singleton needs to do an empty instantiation inside your worker function because we want access to some attribute stored in the singleton. We don’t know what value to instantiate it with because that’s the very thing we’re trying to access from the attribute.

Environment Variables

Here’s a simple solution I came up with that worked for me, and might for you as well. The solution here uses environment variables to store state across processes.

import time
from joblib import Parallel, delayed
import os
class OnlyOne:
"""Singleton Class, inspired by
https://python-3-patterns-idioms-test.readthedocs.io/en/latest/Singleton.html
Modified to work with parallel processes using environment
variables to store state across processes.
"""
class __OnlyOne:
def __init__(self, arg):
if arg is None:
raise ValueError("Pretend empty instantiation breaks code")
self.val = arg
def __str__(self):
return repr(self) + self.val
instance = None
def __init__(self, arg=None):
if not self.instance:
if arg is None:
# look up val from env var
arg = os.getenv('SINGLETON_VAL')
else:
# set env var so all workers use the same val
os.environ['SINGLETON_VAL'] = arg
self.instance = self.__OnlyOne(arg)
else:
self.instance.val = arg
def __getattr__(self, name):
return getattr(self.instance, name)
def worker(num):
"""Single worker function to run in parallel.
Assume that this function has to do an empty
instantiation of the singleton.
"""
one = OnlyOne()
time.sleep(0.1)
one.val += num
return one.val
# Instantiate singleton
one = OnlyOne(0)
print(one.val)
# Run in parallel worry-free
res = Parallel(n_jobs=-1, verbose=10)(
delayed(worker)(i) for i in range(10)
)
print(res)

Pass Singleton as Argument

Another solution is to simply pass the instantiated singleton instance as an argument to the worker function.

import time
from joblib import Parallel, delayed
class OnlyOne:
"""Singleton Class, inspired by
https://python-3-patterns-idioms-test.readthedocs.io/en/latest/Singleton.html"""
class __OnlyOne:
def __init__(self, arg):
if arg is None:
raise ValueError("Pretend empty instantiation breaks code")
self.val = arg
def __str__(self):
return repr(self) + self.val
instance = None
def __init__(self, arg=None):
if not self.instance:
self.instance = self.__OnlyOne(arg)
else:
self.instance.val = arg
def __getattr__(self, name):
return getattr(self.instance, name)
def worker(num, one):
"""Single worker function to run in parallel.
"""
time.sleep(0.1)
one.val += num
return one.val
# Instantiate singleton
one = OnlyOne(0)
print(one.val)
# Run in parallel succeeds when one is passed
# as arg to worker
res = Parallel(n_jobs=-1, verbose=10)(
delayed(worker)(i, one) for i in range(10)
)
print(res)
Categories
Coding Data Science Python

Popular Python Packages for Data Science

Python has a robust ecosystem of data science packages. In this article, I’ll discuss the most popular Python packages for data science, including the essentials as well as my favorite packages for visualization, natural language processing, and deep learning.

Essential Python Packages for Data Science

The vast majority of data science workflows utilize these four essential Python packages.

I recommend using Anaconda to setup your Python environment. One of it’s many benefits is that it automatically installs these 4 libraries, along with many other essential Python packages for data science.

Numpy

The fundamental package for scientific computing with Python.

https://numpy.org/

Numpy is foundational for data science workflows because of it’s efficient vector operations. The Numpy ndarray is a workhorse for mathematical computations in tons of useful libraries.

Pandas

pandas is a fast, powerful, flexible and easy to use open source data analysis and manipulation tool, built on top of the Python programming language.

https://pandas.pydata.org/

The Pandas dataframe is the primary data object for most data science workflows. A dataframe is basically a database table, with named columns of different data types. Numpy ndarrays, in contrast, must have the same data type for each element.

Scikit Learn

Simple and efficient tools for predictive data analysis

https://scikit-learn.org/stable/

Scikit Learn is the workhorse for machine learning pipelines. It’s built on top of Numpy and Matplotlib, and plays nice with Pandas. Scitkit Learn offers implementations of almost every popular machine learning algorithm, including logistic regression, random forest, support vector machines, k-means, and many more.

Matplotlib

Matplotlib is a comprehensive library for creating static, animated, and interactive visualizations in Python.

https://matplotlib.org/

Matplotlib is the foundational data visualization package for Python. Pandas and Scikit Learn both have handy visualization modules that rely on Matplotlib. It’s a very flexible and intuitive plotting library. The downside is that creating complex and aesthetically pleasing plots usually requires many lines of code.

Visualization Packages

My two favorite Python visualization packages for data science are both built on top of Matplotlib.

Seaborn

Seaborn is a Python data visualization library based on matplotlib. It provides a high-level interface for drawing attractive and informative statistical graphics.

https://seaborn.pydata.org/

Seaborn makes beautiful statistical plots with one line of code. Here’s a few of my favorite examples.

A violin plot made with the Seaborn data visualization Python package can help you visualize the distribution of a variable for various slices. The violin plot displays a box and whisker plot along with a kernel density estimate of the distribution.
A joint plot made with the Seaborn data visualization Python package can help you visualize the joint distribution of two variables. The joint plot is similar to a scatter plot, but shows the density of values instead of individual observations. It also shows the univariate distributions on each axis.

Scikit Plot

There are a number of visualizations that frequently pop up in machine learning. Scikit-plot generates quick and beautiful graphs and plots with as little boilerplate as possible.

https://scikit-plot.readthedocs.io/en/stable/

Scikit Plot provides one-liners for many common machine learning plots, including confusion matrix heatmaps, ROC curves, precision-recall curves, lift curves, cumulative gains charts, and others. Here’s a slideshow of examples.

Natural Language Processing Packages

Natural language processing (NLP) is my specialty within data science. There’s a lot you can accomplish with Scikit Learn for NLP, which I’ll quickly mention below, but there are two additional libraries that can really help you level-up your NLP project.

Scikit Learn

CountVectorizer or TfidfVectorizer make it easy to transform a corpus of documents into a term document matrix. You can train a bag of words classification model in no time when you combine these with LogisticRegression.

Scikit Learn also provides LatentDirichletAllocation for topic modeling with LDA. I like to pair it with pyLDAvis to produce interactive topic modeling dashboards like the one below.

Screenshot of an interactive topic modeling dashboard generated by pyLDAvis.

Spacy

Industrial strength natural language processing

https://spacy.io/

Spacy is really powerful, and in my opinion supersedes the NLTK package that used to be the gold standard for things like part of speech tagging, dependency parsing, and named entity recognition.

Spacy does all of those for you in one line of code without any NLP knowledge. And it’s extensible to add your own entities and meta data to spans of tokens within each document.

And the displacy visualization tool is just awesome. Check out this slideshow of examples.

Transformers

🤗 Transformers provides general-purpose architectures (BERT, GPT-2, RoBERTa, XLM, DistilBert, XLNet…) for Natural Language Understanding (NLU) and Natural Language Generation (NLG) with over 32+ pretrained models in 100+ languages and deep interoperability between TensorFlow 2.0 and PyTorch.

https://huggingface.co/transformers/

State-of-the-art language models such as BERT and GPT-3 are trained with a neural network architecture called “transformers.” The Transformers library by Hugging Face allows you to apply these pre-trained models to your text data.

By doing so, you can generate vectors for each sentence in your corpus and use those vectors as features in your models. Or if your task is one that Transformers supports, you can just apply a complete model and be done. They currently support the following tasks.

  • Sequence classification
  • Extractive question answering
  • Language modeling
  • Named entity recognition
  • Summarization
  • Translation

Deep Learning Packages

Deep learning in Python is dominated by two packages: TensorFlow and PyTorch. There are others, but most data scientists use one of these two, and the split seems roughly equal. So if you want to train a neural network, I recommend picking TensorFlow or PyTorch.

TensorFlow

An end-to-end open source machine learning platform

https://www.tensorflow.org/

I use TensorFlow for all of my deep learning needs. The Keras high-level wrapper, which is now incorporated into TensorFlow 2.0, was what sold me on TensorFlow. It’s intuitive, reasonably flexible, and efficient.

TensorFlow is production ready with the TensorFlow Serving module. I’ve used it in combination with AWS SageMaker to deploy a neural network behind an API, but I wouldn’t describe TensorFlow Serving as particularly easy to use.

PyTorch

An open source machine learning framework that accelerates the path from research prototyping to production deployment

https://pytorch.org/

I’ve never personally used PyTorch (except as a backend for the Transformers library), so I probably won’t do it justice here. I’ve heard it described as more popular in academia. That quote above from their website suggests they are trying to change that image.

But my understanding is that PyTorch offers a bit more flexibility for designing novel neural network architectures than does TensorFlow. So if you plan to do research on new architectures, PyTorch might be right for you.