The instructions below will guide you through a simple Python data transformation. If you are just getting started with data transformation, consider going through the batch pipeline tutorial for Pipeline Builder or Code Repositories first.
This tutorial walks through how you can use Python transforms to transform a spreadsheet of recent meteorite discoveries to a usable dataset ready for analysis.
This tutorial uses data from NASA’s Open Data Portal ↗. You can follow along on your own code repository with this sample dataset:
This dataset contains data about meteorites that have been found on Earth. Note that the data has been cleaned to make it easier to work with.
The dataset includes name, mass, classification, and other identifying information for each meteorite, along with the year it was discovered and coordinates of where it was found. It is good practice to open the CSV to review the data before uploading it into Foundry.
Get started by creating a Python code repository.
Alternatively, you can copy your local Python repository into Code Repositories with the following steps:
Create a new Python code repository, as described above.
On your local repository, remove your previous Git origin (if you cloned it from GitHub, for example): git remote remove origin
Add your code repository's Git remote URL: git remote add origin <repository_url>
You can find your code repository URL in the top right corner of the GitHub interface. Select the green Clone button, then copy the Git remote URL. Confirm this by running git remote -v
to return the code repository URL.
Merge the current master
branch (or another branch of your choosing) in Code Repositories into your local branch: git merge master
If an error about refusing to merge unrelated histories
occurs, run the command: git merge master --allow-unrelated-histories
. This will remove the current Git history associated with your previous remote GitHub repository.
This merge will bring essential files to your local repository that are required to make commits and changes in Code Repositories.
Create a new branch and name it (testbranch
, for example): git checkout -b testbranch
.
Make your changes and commit them to your branch.
Perform git push
, and confirm that the new branch appears in the Code Repositories interface. Verify that checks are successful.
Learn more about local development in Code Repositories.
Navigate to your Python transforms repository. The default examples.py
file contains example code to help you get started. Note that you can choose between pandas, Polars, and Spark compute engines. For more information on choosing a compute engine, refer to the compute engine comparison documentation.
Start by creating a new file in src/myproject/datasets
, and call it meteor_analysis.py
to organize your analysis. Make sure you import the required functions and classes. Define a transformation that takes your meteor_landings
dataset as input and creates meteor_landings_cleaned
as its output:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
from transforms.api import transform, Input, Output @transform.using( # Replace this with your output dataset path output=Output("/Users/jsmith/meteorite_landings_cleaned"), # Replace this with your input dataset path meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = meteorite_landings.polars() # Your data transformation logic output.write_table(df)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
from transforms.api import transform, Input, Output @transform.using( # Replace this with your output dataset path output=Output("/Users/jsmith/meteorite_landings_cleaned"), # Replace this with your input dataset path meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = meteorite_landings.pandas() # Your data transformation logic output.write_table(df)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
from transforms.api import transform, Input, Output @transform( # Replace this with your output dataset path output=Output("/Users/jsmith/meteorite_landings_cleaned"), # Replace this with your input dataset path meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = meteorite_landings.dataframe() # Your data transformation logic output.write_dataframe(df)
Now, suppose you want to filter your input dataset down to any “Valid” meteors that happened after the year 1950. Update your data transformation logic to filter the meteorites by nametype
and year
:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
import polars as pl from transforms.api import transform, Input, Output @transform.using( output=Output("/Users/jsmith/meteorite_landings_cleaned"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = ( meteorite_landings.polars() .filter(pl.col("nametype") == "Valid") .filter(pl.col("year") >= 1950) ) output.write_table(df)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14
from transforms.api import transform, Input, Output @transform.using( output=Output("/Users/jsmith/meteorite_landings_cleaned"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = meteorite_landings.pandas() df = df[ (df["nametype"] == "Valid") & (df["year"] >= 1950) ] output.write_table(df)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from transforms.api import transform, Input, Output @transform( output=Output("/Users/jsmith/meteorite_landings_cleaned"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def clean(output, meteorite_landings): df = ( meteorite_landings.dataframe().filter( meteorite_landings.dataframe().nametype == 'Valid' ).filter( meteorite_landings.dataframe().year >= 1950 ) ) output.write_dataframe(df)
To build your resulting dataset, commit your changes and select Build in the top right corner. For more information about building datasets in Code Repositories, review the Create a simple batch pipeline tutorial.
With Python transforms, you can create multiple output datasets in a single Python file.
Let’s say you want to filter down even further to only meteors that were particularly large for their meteorite type. To do so, you will need to:
First, add a data transformation to meteor_analysis.py
that finds the average mass for each meteorite type. This transformation takes your meteor_landings
dataset as input and creates meteorite_stats
as its output:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import polars as pl from transforms.api import transform, Input, Output @transform.using( # Output dataset name must be unique output=Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(output, meteorite_landings): df = ( meteorite_landings.polars() .groupby("class") .agg(pl.col("mass").mean().alias("avg_mass_per_class")) ) output.write_table(df)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from transforms.api import transform, Input, Output @transform.using( # Output dataset name must be unique output=Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(output, meteorite_landings): df = ( meteorite_landings.pandas() .groupby("class", as_index=False)["mass"] .mean() .rename(columns={"mass": "avg_mass_per_class"}) ) output.write_table(df)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from transforms.api import transform, Input, Output @transform( # Output dataset name must be unique output=Output("/Users/jsmith/meteorite_stats"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), ) def stats(output, meteorite_landings): df = ( meteorite_landings.dataframe().groupBy("class").agg( F.mean("mass").alias("avg_mass_per_class") ) ) output.write_dataframe(df)
Next, create a data transformation that compares each meteor’s mass to the average mass for its meteor type. The information needed for this transformation is spread across the meteorite_landings
and meteorite_stats
tables that you created in this tutorial. You must join the two datasets and filter the resulting dataset to find meteorites that have a greater-than-average mass, as shown below:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import polars as pl from transforms.api import transform, Input, Output @transform.using( output=Output("/Users/jsmith/meteorite_enriched"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), meteorite_stats=Input("/Users/jsmith/meteorite_stats"), ) def enriched(output, meteorite_landings, meteorite_stats): df_landings = meteorite_landings.polars() df_stats = meteorite_stats.polars() enriched = df_landings.join(df_stats, on="class", how="inner") enriched = enriched.with_columns( (pl.col("mass") > pl.col("avg_mass_per_class")).alias("greater_mass") ) result = enriched.filter(pl.col("greater_mass")) output.write_table(result)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from transforms.api import transform, Input, Output @transform( output=Output("/Users/jsmith/meteorite_enriched"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), meteorite_stats=Input("/Users/jsmith/meteorite_stats"), ) def enriched(output, meteorite_landings, meteorite_stats): df_landings = meteorite_landings.pandas() df_stats = meteorite_stats.pandas() enriched = df_landings.merge(df_stats, on="class", how="inner") enriched["greater_mass"] = enriched["mass"] > enriched["avg_mass_per_class"] result = enriched[enriched["greater_mass"]] output.write_table(result)
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
from transforms.api import transform, Input, Output @transform( output=Output("/Users/jsmith/meteorite_enriched"), meteorite_landings=Input("/Users/jsmith/meteorite_landings"), meteorite_stats=Input("/Users/jsmith/meteorite_stats"), ) def enriched(output, meteorite_landings, meteorite_stats): df_landings = meteorite_landings.dataframe() df_stats = meteorite_stats.dataframe() enriched_together = df_landings.join(df_stats, "class") greater_mass=enriched_together.withColumn( 'greater_mass', (enriched_together.mass > enriched_together.avg_mass_per_class) ) result = greater_mass.filter("greater_mass") output.write_dataframe(result)
Now, you can further analyze the resulting meteorite_enriched
dataset by exploring it in Contour.
So far, you created a dataset that contains meteorites of all types with a greater-than-average mass. The next step is to create separate datasets for each meteorite type. With Python transforms, you can use a for-loop to apply the same data transformation to each type of meteorite. For more information on applying the same data transformation to different inputs, refer to the section on transform generation.
Create a new file in src/myproject/datasets
and name it meteor_class.py
. Note that you can continue writing your data transformation code in the meteor_analysis.py
file, but this tutorial uses a new file to separate the data transformation logic.
To create separate datasets for each meteorite type, filter the meteorite_enriched
dataset by class. Then, define a transform_generator
function that applies the same data transformation logic to each of the meteorite types you want to analyze:
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import polars as pl from transforms.api import transform, Input, Output def transform_generator(sources): transforms = [] for source in sources: @transform.using( output=Output(f'/Users/jsmith/meteorite_{source}'), my_input=Input('/Users/jsmith/meteorite_enriched') ) def filter_by_source(output, my_input, source=source): df = my_input.polars() result = df.filter(pl.col("class") == source) output.write_table(result) transforms.append(filter_by_source) return transforms TRANSFORMS = transform_generator(["L6", "H5", "H4"])
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
from transforms.api import transform, Input, Output def transform_generator(sources): transforms = [] for source in sources: @transform.using( output=Output(f'/Users/jsmith/meteorite_{source}'), my_input=Input('/Users/jsmith/meteorite_enriched') ) def filter_by_source(output, my_input, source=source): df = my_input.pandas() result = df[df["class"] == source] output.write_table(result) transforms.append(filter_by_source) return transforms TRANSFORMS = transform_generator(["L6", "H5", "H4"])
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
from transforms.api import transform, Input, Output def transform_generator(sources): transforms = [] for source in sources: @transform( output=Output(f'/Users/jsmith/meteorite_{source}'), my_input=Input('/Users/jsmith/meteorite_enriched') ) def filter_by_source(output, my_input, source=source): result = my_input.dataframe().filter(my_input["class"] == source) output.write_table(result) transforms.append(filter_by_source) return transforms TRANSFORMS = transform_generator(["L6", "H5", "H4"])
This will create a transformation that filters our meteorite dataset by class. Note that we must pass source=source
into the filter_by_source
function in order to capture the source
parameter in the function’s scope.
For the initial data transformation created in the meteor_analysis.py
file, you did not have to do any additional configuration to add Transforms to your project’s pipeline. This is because the default Python project structure uses automatic registration to discover all transform objects within your datasets
folder.
To add this final transformation to your project’s pipeline using automatic registration, you must add the generated transforms to a variable as a list. In the example above, we used the variable TRANSFORMS
. For more information about automatic registration and transforms generators, refer to the section on transforms generation in the Python transforms documentation.