Orchestrate an ETL pipeline with Dagster – Beginners Guide

You have been learning about Dagster and now want to see in practice how to convert an actual python ETL script to be orchestrate using the a Dagster pipeline.

Context

In this project the idea is to have insights for the time a development team is taking to review PRs. This is a common issue that is encountered and can be frustrating for the reviewer and the developer waiting for the review.

The propose of this pipeline is to be able to get by the Github repository name the amount of files changed and the time in hours that the pull requests took to be reviewed and with this data create a Jupyter Notebook chart to be able to see if this the analyses makes sense to take actions on it.

Basically if the amount of files changes increases too much the time in review it means the team can try to create smaller PRs during the development cycle or even take actions in the product requirements specifications make them smaller.

Python script for the solution

Now let’s get to the script code. It can be found at github

The script is using some dependencies so in order to run it those needed to be installed. They can be installed using pip:

pip install PyGithub pandas pickle nbformat nbconvert jupytext

1. Extract pull requests from Github

def extract(repo_name): repo = Github(ACCESS_TOKEN).get_repo(repo_name) closed_pulls = list(repo.get_pulls(state="closed")) closed_pulls_details = [ repo.get_pull(number=pull.number) for pull in closed_pulls ] return closed_pulls_details

First of all a github access token need to be

2. Transform pull requests data

def transform(pulls): df = pd.DataFrame( [ { "changed_files": pull.changed_files, "created_at": pull.created_at, "merged_at": pull.merged_at, "time_in_review": ( round((pull.merged_at - pull.created_at).total_seconds() / 3600, 1) if pull.created_at is not None and pull.merged_at is not None else 0 ) } for pull in pulls ] ) df = df[df.time_in_review != 0].sort_values(by="time_in_review") logger.info(df) md_content = f"""# Github Pull Requests Review Time```pythonimport picklepull_requests = pickle.loads({pickle.dumps(df)!r})## Repo 30 pull requests with higher review time by changed filespull_requests.tail(30).reset_index().plot.bar(x="time_in_review", y="changed_files") """ return md_content

3. Load the transformed data

def load(md_content): with open("output.md", "w") as f: f.write(md_content)

4. Report data to Github gist

def report(md_file): nb = jupytext.reads(md_file, "md") ExecutePreprocessor().preprocess(nb) notebook_visualization = nbformat.writes(nb) gist = ( Github(ACCESS_TOKEN) .get_user() .create_gist( public=False, files={ "pull_requests_analyses.ipynb": InputFileContent(notebook_visualization), }, ) ) return gist.html_url

5. Running the script

if __name__ == "__main__": # Run ETL input = extract("radar-parlamentar/radar") output = transform(input) load(output) # Run report with open('output.md', 'r') as f: file = f.read() report_url = report(file) print(report_url)

6. Problems of using an ETL script

  1. The process takes a while even more the extract part, it would be wonderful if it was possible to iterate through the steps of the ETL without the need of running everything every time. For example if we want to change to get the last 50 PRs instead of the last 30 it would need to make the expensive call to get all the PRs again.
  2. More than taking too much time for the requests being done to fetch the PRs to an external system, the requests could also fail and how this problem can be solved, how can the pipeline make sure to retry it when it fails.
  3. In a complex pipeline the load phase can be way more complex than just saving in a local file as in the current code. For example it can be done loading it to an s3, a git repo, a data warehouse or other databases. This would require to make manual changes to the code current being used.
  4. There’s no control of scheduling. A pipeline like is this often is required to be ran on a regular basis so the data stays updated. With only the need to understand what this schedule is and the stakeholders are aligned with that.

Orchestrate the script in dagster

1. Get Dagster up and running

To install Dagster it’s pretty simple

$ pip install dagster

This installs the ci tool that helps to scaffold a project

There are some templates to start a project but we’re going to be using the simpler one

$ dagster project scaffold --name pull-requests-pipeline

this creates some cool inicial code such as:

  • setup.py – lists all the depencies that we need
  • workspace.yaml – tells Dagster where to run it locally

For the dependencies in the setyp.py it’s needed:

from setuptools import find_packages, setupsetup( name="pull_requests_pipeline", packages=find_packages(exclude=["pull_requests_pipeline_tests"]), install_requires=[ "dagster", "dagster-cloud", "PyGithub", "matplotlib", "pandas", "nbconvert", "nbformat", "ipykernel", "jupytext" ], extras_require={"dev": ["dagit", "pytest"]},)

They can be installed running:

$ pip install -e '.[dev]'

There are two components that runs in Dagster:

  • The UI using Dagit
  • The deamons that run the schedulers

Because we’re not going to be using schedules for now we can run the UI just running

$ dagit

2. Converting the script into assets

In the assets.py file that was created during the scaffold, Dagster asset and the script need to be imported.

from dagster import assetimport pull_request_etl

1. Extract

@assetdef extract(): return pull_request_etl.extract("dagster-io/dagster")

One thing to take into consideration is failures. So if the http requests in the extract phases does not work the asset will be marked as failed. It can be simulated adding raise RuntimeError("fake failure") that would be http requests failing

This can be solved telling Dagster what is the retry policy:

@asset(retry_policy=RetryPolicy(max_retries=5, delay=5))def extract(): return pull_request_etl.extract("radar-parlamentar/radar")

In this case it’s being set to retry it at most 5 times before failing for good and waiting 5 seconds to retry again after the failure.

2. Transform

@assetdef transform(extract): return pull_request_etl.transform(extract)

Another problem that we can solve at this point is the scheduling of the asset that can be done saying what the freshness policy of the asset will be:

freshness_policy=FreshnessPolicy(maximum_lag_minutes=30)

@asset(freshness_policy=FreshnessPolicy(maximum_lag_minutes=30))def transform(extract): return pull_request_etl.transform(extract)

So if this is send to production Dagster will run it every 30 minutes to make sure the data is up to date and that will make the assets that the current one depends on will also run again.

3. Load

@assetdef load(transform): return pull_request_etl.load(transform)

One problem here is that we still have the load step very integrated inside of the code so if it needed to be changed to be store on s3 or in a Github repo for example we still have it very depended storing in the file system.

Dagster has the concept of I/O manager that makes easy to change the input and outputs by configuration, making it easier even for change it across environments. Having the separation of where things are stored and the logic is very cool.

So the I/O manager basically saves assets to the storage and load them from storage

4. Report

@assetdef report(context, load): with open('output.md', 'r') as f: file = f.read() report_url = pull_request_etl.report(file) context.log.info(report_url) return report_url

3. Assets observability

4. Creating schedule

5. Handling environment variables

Future projects

To help practicing what was taught in this article and still try to help developer teams to make analyses of how the size of a Pull Request affects the effectiveness of a review the same pipeline can be reproduces but this time approaching the analysis checking the amount of comments in a PR related to the amount of files/lines changed.

10 lines of code = 10 issues.

500 lines of code = “looks fine”.

Code reviews.

I Am Devloper (@iamdevloper) November 5, 2013

This is a fun quote representing the problem in question.

So why not put in practice your knowledge in Dagster pipelines with a real problem.

Author

Marco William

Software Engineer

Leave a Reply

Your email address will not be published. Required fields are marked *