Data ingestion and formatting#

This notebook explains how to convert the Climate TRACE dataset to a format that is more appropriate for data science.

Note

This section is relevant for data engineers, or data scientists who want to understand how the data has been prepared. Skip if you just want to access the final, prepared data.

Warning

This notebook requires significant resources (300GB disk and 10GB memory for all gases).

The original data from Climate TRACE is offered as a series of CSV files bundled in ZIP archives. That format is universally understood, but it is not the most effective for effective analysis with data science tools. In particular, it is large: the source data, uncompressed, is about 100GB for each gas! This is the size at which most people would consider this project to be “big data” or at least “medium data”. With the proper choice of data storage, we will bring it down to a breezy “small data” without losing information along the way.

Instead, we are going to use the Parquet format. This format has a number of advantages:

  • it is column-based : data systems can process big chunks of data at once, rather than line by line. Also, depending on the information requested, systems will read only the relevant columns and skip the rest very effectively

  • it is universal : most modern data systems will be able to read it.

  • it is structured : basic information about numbers, categories, … are preserved.

Looking at the code, we are performing a few tricks:

Compacting the data We minimize the size of the files by taking advantage of its structures. In particular, we know in many cases that values are part of known enumerations (sectors, …). We replace all these by polars.Enumerations. Not only this makes files smaller, but it also allows data systems to make clever optimization for complex operations such as joining.

Lazy reading If we were to read all the source data using a traditional system such as Excel or Pandas, we would require a serious amount of memory. The files themselves are more than 5GB. Polars is capable of reading straight from the zip file in a streaming fashion. This is what Polars calls a Lazy dataframe, or LazyFrame. Even when doing complicated operations such as joining the source files with the confidence information, Polars only uses 3GB of memory on my machine. In fact, this way of working is so fast that the ctrace package directly reads all the country emissions data from the zip files in less than a second.

Using known enumerations You will see in the source code that nearly all the variables such as column names, names of gas and sectors, etc. are replaced CONSTANT_NAMES such as CH4,…. You can use that to autocomplete

%load_ext autoreload
%autoreload 2
import logging
logging.basicConfig(level=logging.INFO)
import os
import polars as pl
from ctrace.constants import *
import ctrace as ct
import pyarrow.parquet
from dds import data_function
from pathlib import Path
import duckdb
import tempfile
import shutil
import dds
import tempfile
import huggingface_hub
import huggingface_hub.utils
logging.getLogger("dds").setLevel(logging.WARNING)
dds.accept_module(ct)
os.environ["POLARS_TEMP_DIR"] = os.path.join(tempfile.gettempdir(), "polars")
duckdb.sql("SET temp_directory = '{d}'".format(d=os.path.join(tempfile.gettempdir(), "duckdb")))
duckdb.sql("SET memory_limit = '8GB'")
duckdb.sql("SELECT current_setting('temp_directory')")
┌───────────────────────────────────┐
│ current_setting('temp_directory') │
│              varchar              │
├───────────────────────────────────┤
│ /media/tjhunter/DATA/temp/duckdb  │
└───────────────────────────────────┘

Creating optimized parquet files for source data#

This first section creates files that are the most effective for reading and querying. The general approach is as follows:

  1. Join the source and source confidence CSV files and writes them as parquet files for each subsector

  2. Aggregate by year into a yearly parquet file

  3. Optimize this parquet file for reading

This first command creates parquet files that join the source and source confidences for each subsector, and returns a list of all the created files.

In this notebook, another trick is to define the transformations as data functions. In short, this code will only run if the source code changes. This makes rerunning the notebooks very fast, and only updating when something has changed in the source code.

@data_function("/data_sources")
def load_sources():
    (_, files) = ct.data.load_source_compact()
    return files

load_sources()
[PosixPath('/media/tjhunter/DATA/temp/co2/cropland-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/enteric-fermentation-cattle-operation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/enteric-fermentation-cattle-pasture_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/manure-left-on-pasture-cattle_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/manure-management-cattle-operation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/rice-cultivation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/synthetic-fertilizer-application_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/non-residential-onsite-fuel-usage_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/residential-onsite-fuel-usage_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/forest-land-clearing_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/forest-land-degradation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/forest-land-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/net-forest-land_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/net-shrubgrass_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/net-wetland_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/removals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/shrubgrass-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/water-reservoirs_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/wetland-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/coal-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/oil-and-gas-production_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/oil-and-gas-refining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/oil-and-gas-transport_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/aluminum_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/cement_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/chemicals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/food-beverage-tobacco_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/glass_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/iron-and-steel_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/lime_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/other-chemicals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/other-manufacturing_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/other-metals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/petrochemical-steam-cracking_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/pulp-and-paper_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/textiles-leather-apparel_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/bauxite-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/copper-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/iron-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/electricity-generation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/domestic-aviation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/domestic-shipping_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/international-aviation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/international-shipping_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/road-transportation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/domestic-wastewater-treatment-and-discharge_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/industrial-wastewater-treatment-and-discharge_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2/solid-waste-disposal_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/cropland-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/enteric-fermentation-cattle-operation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/enteric-fermentation-cattle-pasture_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/manure-left-on-pasture-cattle_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/manure-management-cattle-operation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/rice-cultivation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/synthetic-fertilizer-application_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/non-residential-onsite-fuel-usage_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/residential-onsite-fuel-usage_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/forest-land-clearing_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/forest-land-degradation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/forest-land-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/net-forest-land_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/net-shrubgrass_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/net-wetland_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/removals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/shrubgrass-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/water-reservoirs_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/wetland-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/coal-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/oil-and-gas-production_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/oil-and-gas-refining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/oil-and-gas-transport_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/aluminum_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/cement_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/chemicals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/food-beverage-tobacco_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/glass_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/iron-and-steel_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/lime_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/other-chemicals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/other-manufacturing_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/other-metals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/petrochemical-steam-cracking_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/pulp-and-paper_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/textiles-leather-apparel_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/bauxite-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/copper-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/iron-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/electricity-generation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/domestic-aviation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/domestic-shipping_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/international-aviation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/international-shipping_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/road-transportation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/domestic-wastewater-treatment-and-discharge_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/industrial-wastewater-treatment-and-discharge_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/ch4/solid-waste-disposal_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/cropland-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/enteric-fermentation-cattle-operation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/enteric-fermentation-cattle-pasture_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/manure-left-on-pasture-cattle_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/manure-management-cattle-operation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/rice-cultivation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/synthetic-fertilizer-application_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/non-residential-onsite-fuel-usage_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/residential-onsite-fuel-usage_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/forest-land-clearing_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/forest-land-degradation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/forest-land-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/net-forest-land_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/net-shrubgrass_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/net-wetland_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/removals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/shrubgrass-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/water-reservoirs_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/wetland-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/coal-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/oil-and-gas-production_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/oil-and-gas-refining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/oil-and-gas-transport_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/aluminum_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/cement_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/chemicals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/food-beverage-tobacco_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/glass_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/iron-and-steel_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/lime_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/other-chemicals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/other-manufacturing_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/other-metals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/petrochemical-steam-cracking_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/pulp-and-paper_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/textiles-leather-apparel_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/bauxite-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/copper-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/iron-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/electricity-generation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/domestic-aviation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/domestic-shipping_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/international-aviation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/international-shipping_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/road-transportation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/domestic-wastewater-treatment-and-discharge_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/industrial-wastewater-treatment-and-discharge_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/n2o/solid-waste-disposal_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/cropland-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/enteric-fermentation-cattle-operation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/enteric-fermentation-cattle-pasture_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/manure-left-on-pasture-cattle_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/manure-management-cattle-operation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/rice-cultivation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/synthetic-fertilizer-application_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/non-residential-onsite-fuel-usage_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/residential-onsite-fuel-usage_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/forest-land-clearing_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/forest-land-degradation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/forest-land-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/net-forest-land_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/net-shrubgrass_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/net-wetland_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/removals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/shrubgrass-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/water-reservoirs_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/wetland-fires_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/coal-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/oil-and-gas-production_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/oil-and-gas-refining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/oil-and-gas-transport_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/aluminum_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/cement_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/chemicals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/food-beverage-tobacco_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/glass_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/iron-and-steel_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/lime_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/other-chemicals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/other-manufacturing_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/other-metals_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/petrochemical-steam-cracking_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/pulp-and-paper_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/textiles-leather-apparel_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/bauxite-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/copper-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/iron-mining_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/electricity-generation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/domestic-aviation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/domestic-shipping_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/international-aviation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/international-shipping_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/road-transportation_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/domestic-wastewater-treatment-and-discharge_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/industrial-wastewater-treatment-and-discharge_emissions_sources.parquet'),
 PosixPath('/media/tjhunter/DATA/temp/co2e_100yr/solid-waste-disposal_emissions_sources.parquet')]

To help with the loading, the data is partitioned by year. This is the most relevant for most users: most people are expected to look at specific years and sectors (especially the latest year). This reduces the amount of data to load.

Let us have a quick peek at the data in one of these files. It looks already pretty good: a lot of the redundant data such as the enumerations has been deduplicated. All the enumeration data is now converted to integers, this is what dictionary<values=string, indices=int32, ordered=0> means. It is not quite ready for high performance however.

from pyarrow.parquet import read_table
fname = load_sources()[0]
print(fname)
read_table(fname)
/media/tjhunter/DATA/temp/co2/cropland-fires_emissions_sources.parquet
pyarrow.Table
source_id: uint64
iso3_country: dictionary<values=string, indices=int32, ordered=0>
sector: dictionary<values=string, indices=int32, ordered=0>
subsector: dictionary<values=string, indices=int32, ordered=0>
original_inventory_sector: dictionary<values=string, indices=int32, ordered=0>
start_time: timestamp[ms, tz=UTC]
end_time: timestamp[ms, tz=UTC]
temporal_granularity: dictionary<values=string, indices=int32, ordered=0>
gas: dictionary<values=string, indices=int32, ordered=0>
emissions_quantity: double
emissions_factor: double
emissions_factor_units: large_string
capacity: double
capacity_units: large_string
capacity_factor: double
activity: double
activity_units: large_string
created_date: timestamp[ms, tz=UTC]
modified_date: timestamp[ms, tz=UTC]
source_name: large_string
source_type: large_string
lat: double
lon: double
other1: large_string
other2: large_string
other3: large_string
other4: large_string
other5: large_string
other6: large_string
other7: large_string
other8: large_string
other9: large_string
other10: large_string
other11: large_string
other12: large_string
other1_def: large_string
other2_def: large_string
other3_def: large_string
other4_def: large_string
other5_def: large_string
other6_def: large_string
other7_def: large_string
other8_def: large_string
other9_def: large_string
other10_def: large_string
other11_def: large_string
other12_def: large_string
geometry_ref: large_string
conf_source_type: dictionary<values=string, indices=int32, ordered=0>
conf_capacity: dictionary<values=string, indices=int32, ordered=0>
conf_capacity_factor: dictionary<values=string, indices=int32, ordered=0>
conf_activity: dictionary<values=string, indices=int32, ordered=0>
conf_emissions_factor: dictionary<values=string, indices=int32, ordered=0>
conf_emissions_quantity: dictionary<values=string, indices=int32, ordered=0>
----
source_id: [[10760226,10760226,10760226,10760226,10760226,...,10792989,10792989,10792989,10792989,10792989],[10792989,10792989,10792989,10792989,10792989,...,10812836,10812836,10812836,10812836,10812836],...,[11298418,11298418,11298418,11298418,11298418,...,11301759,11301759,11301759,11301759,11301759],[11301822,11301822,11301822,11301822,11301822,...,11303229,11303229,11303229,11303229,11303229]]
iso3_country: [  -- dictionary:
["ABW","AFG","AGO","AIA","ALA",...,"ZWE","ZNC","UNK","SCG","XAD"]  -- indices:
[1,1,1,1,1,...,23,23,23,23,23],  -- dictionary:
["ABW","AFG","AGO","AIA","ALA",...,"ZWE","ZNC","UNK","SCG","XAD"]  -- indices:
[23,23,23,23,23,...,32,32,32,32,32],...,  -- dictionary:
["ABW","AFG","AGO","AIA","ALA",...,"ZWE","ZNC","UNK","SCG","XAD"]  -- indices:
[234,234,234,234,234,...,238,238,238,238,238],  -- dictionary:
["ABW","AFG","AGO","AIA","ALA",...,"ZWE","ZNC","UNK","SCG","XAD"]  -- indices:
[238,238,238,238,238,...,249,249,249,249,249]]
sector: [  -- dictionary:
["agriculture","buildings","fluorinated-gases","forestry-and-land-use","fossil-fuel-operations","manufacturing","mineral-extraction","power","transportation","waste"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],  -- dictionary:
["agriculture","buildings","fluorinated-gases","forestry-and-land-use","fossil-fuel-operations","manufacturing","mineral-extraction","power","transportation","waste"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],...,  -- dictionary:
["agriculture","buildings","fluorinated-gases","forestry-and-land-use","fossil-fuel-operations","manufacturing","mineral-extraction","power","transportation","waste"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],  -- dictionary:
["agriculture","buildings","fluorinated-gases","forestry-and-land-use","fossil-fuel-operations","manufacturing","mineral-extraction","power","transportation","waste"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0]]
subsector: [  -- dictionary:
["aluminum","bauxite-mining","biological-treatment-of-solid-waste-and-biogenic","cement","chemicals",...,"synthetic-fertilizer-application","textiles-leather-apparel","water-reservoirs","wetland-fires","wood-and-wood-products"]  -- indices:
[8,8,8,8,8,...,8,8,8,8,8],  -- dictionary:
["aluminum","bauxite-mining","biological-treatment-of-solid-waste-and-biogenic","cement","chemicals",...,"synthetic-fertilizer-application","textiles-leather-apparel","water-reservoirs","wetland-fires","wood-and-wood-products"]  -- indices:
[8,8,8,8,8,...,8,8,8,8,8],...,  -- dictionary:
["aluminum","bauxite-mining","biological-treatment-of-solid-waste-and-biogenic","cement","chemicals",...,"synthetic-fertilizer-application","textiles-leather-apparel","water-reservoirs","wetland-fires","wood-and-wood-products"]  -- indices:
[8,8,8,8,8,...,8,8,8,8,8],  -- dictionary:
["aluminum","bauxite-mining","biological-treatment-of-solid-waste-and-biogenic","cement","chemicals",...,"synthetic-fertilizer-application","textiles-leather-apparel","water-reservoirs","wetland-fires","wood-and-wood-products"]  -- indices:
[8,8,8,8,8,...,8,8,8,8,8]]
original_inventory_sector: [  -- dictionary:
["aluminum","bauxite-mining","biological-treatment-of-solid-waste-and-biogenic","cement","chemicals",...,"steel","synthetic-fertilizer-application","wastewater-treatment-and-discharge","water-reservoirs","wetland-fires"]  -- indices:
[null,null,null,null,null,...,null,null,null,null,null],  -- dictionary:
["aluminum","bauxite-mining","biological-treatment-of-solid-waste-and-biogenic","cement","chemicals",...,"steel","synthetic-fertilizer-application","wastewater-treatment-and-discharge","water-reservoirs","wetland-fires"]  -- indices:
[null,null,null,null,null,...,null,null,null,null,null],...,  -- dictionary:
["aluminum","bauxite-mining","biological-treatment-of-solid-waste-and-biogenic","cement","chemicals",...,"steel","synthetic-fertilizer-application","wastewater-treatment-and-discharge","water-reservoirs","wetland-fires"]  -- indices:
[null,null,null,null,null,...,null,null,null,null,null],  -- dictionary:
["aluminum","bauxite-mining","biological-treatment-of-solid-waste-and-biogenic","cement","chemicals",...,"steel","synthetic-fertilizer-application","wastewater-treatment-and-discharge","water-reservoirs","wetland-fires"]  -- indices:
[null,null,null,null,null,...,null,null,null,null,null]]
start_time: [[2021-01-01 00:00:00.000Z,2021-02-01 00:00:00.000Z,2021-03-01 00:00:00.000Z,2021-04-01 00:00:00.000Z,2021-05-01 00:00:00.000Z,...,2023-04-01 00:00:00.000Z,2023-05-01 00:00:00.000Z,2023-06-01 00:00:00.000Z,2023-07-01 00:00:00.000Z,2023-08-01 00:00:00.000Z],[2023-09-01 00:00:00.000Z,2023-10-01 00:00:00.000Z,2023-11-01 00:00:00.000Z,2023-12-01 00:00:00.000Z,2024-01-01 00:00:00.000Z,...,2021-12-01 00:00:00.000Z,2022-01-01 00:00:00.000Z,2022-02-01 00:00:00.000Z,2022-03-01 00:00:00.000Z,2022-04-01 00:00:00.000Z],...,[2022-05-01 00:00:00.000Z,2022-06-01 00:00:00.000Z,2022-07-01 00:00:00.000Z,2022-08-01 00:00:00.000Z,2022-09-01 00:00:00.000Z,...,2024-08-01 00:00:00.000Z,2024-09-01 00:00:00.000Z,2024-10-01 00:00:00.000Z,2024-11-01 00:00:00.000Z,2024-12-01 00:00:00.000Z],[2021-01-01 00:00:00.000Z,2021-02-01 00:00:00.000Z,2021-03-01 00:00:00.000Z,2021-04-01 00:00:00.000Z,2021-05-01 00:00:00.000Z,...,2024-08-01 00:00:00.000Z,2024-09-01 00:00:00.000Z,2024-10-01 00:00:00.000Z,2024-11-01 00:00:00.000Z,2024-12-01 00:00:00.000Z]]
end_time: [[2021-01-31 00:00:00.000Z,2021-02-28 00:00:00.000Z,2021-03-31 00:00:00.000Z,2021-04-30 00:00:00.000Z,2021-05-31 00:00:00.000Z,...,2023-04-30 00:00:00.000Z,2023-05-31 00:00:00.000Z,2023-06-30 00:00:00.000Z,2023-07-31 00:00:00.000Z,2023-08-31 00:00:00.000Z],[2023-09-30 00:00:00.000Z,2023-10-31 00:00:00.000Z,2023-11-30 00:00:00.000Z,2023-12-31 00:00:00.000Z,2024-01-31 00:00:00.000Z,...,2021-12-31 00:00:00.000Z,2022-01-31 00:00:00.000Z,2022-02-28 00:00:00.000Z,2022-03-31 00:00:00.000Z,2022-04-30 00:00:00.000Z],...,[2022-05-31 00:00:00.000Z,2022-06-30 00:00:00.000Z,2022-07-31 00:00:00.000Z,2022-08-31 00:00:00.000Z,2022-09-30 00:00:00.000Z,...,2024-08-31 00:00:00.000Z,2024-09-30 00:00:00.000Z,2024-10-31 00:00:00.000Z,2024-11-30 00:00:00.000Z,2024-12-31 00:00:00.000Z],[2021-01-31 00:00:00.000Z,2021-02-28 00:00:00.000Z,2021-03-31 00:00:00.000Z,2021-04-30 00:00:00.000Z,2021-05-31 00:00:00.000Z,...,2024-08-31 00:00:00.000Z,2024-09-30 00:00:00.000Z,2024-10-31 00:00:00.000Z,2024-11-30 00:00:00.000Z,2024-12-31 00:00:00.000Z]]
temporal_granularity: [  -- dictionary:
["annual","other","month","week","day","hour"]  -- indices:
[2,2,2,2,2,...,2,2,2,2,2],  -- dictionary:
["annual","other","month","week","day","hour"]  -- indices:
[2,2,2,2,2,...,2,2,2,2,2],...,  -- dictionary:
["annual","other","month","week","day","hour"]  -- indices:
[2,2,2,2,2,...,2,2,2,2,2],  -- dictionary:
["annual","other","month","week","day","hour"]  -- indices:
[2,2,2,2,2,...,2,2,2,2,2]]
gas: [  -- dictionary:
["co2","ch4","n2o","co2e_100yr"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],  -- dictionary:
["co2","ch4","n2o","co2e_100yr"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],...,  -- dictionary:
["co2","ch4","n2o","co2e_100yr"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],  -- dictionary:
["co2","ch4","n2o","co2e_100yr"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0]]
emissions_quantity: [[4.8278737861685945,9.035360329705544,45.38064148636446,61.32296197461828,38.16818200021535,...,269.018757173264,175.31207076887267,91.39990500605398,58.724632699989535,80.81230569281288],[211.330903793614,292.1079909047341,194.5476801860844,84.82207853739389,43.68382457080989,...,220.01629117534543,128.19921825691085,181.80045288320784,525.6198997919303,724.2735952297198],...,[291.56194269499326,152.00741026461324,97.66508329384772,134.39914740680638,351.46495384672033,...,215.13185767180696,595.3714261275535,821.5498971286825,543.2347171654031,253.0259260929403],[6.746127367423337,9.566743286589825,27.659285595195147,38.112883906122526,25.0453695639254,...,23.05403759079097,65.16548236458307,89.85554183215632,59.229313787334775,28.22216552802012]]
...

Aggregating by year and optimizing the output#

The following block takes all the sector files and aggregates them by year. This is based on the expectation that most users will work on the latest year, and that some users will want to look into the trends across the years.

Since these files will be read many times (every time we want to do a graph), it pays off to optimize them. The Parquet format is designed for fast reads of the relevant data. We will do two main optimizations: optimal compression, optimizing the row groups and adding statistics.

Compression Parquet allows some data to be compressed by columns. The first intuition is that, looking at each column of data separately, there will be more patterns and thus more opportunities to compress the data. The second intuition is that, in data-intensive application, reading the data is the bottleneck. It is then faster to read smaller compressed data in memory and then decompress it (losing a bit of time in compute), rather than reading larger, uncompressed data. Modern compression algorithms such as ZStandard or LZ4 are designed to be very effective at using a processor. Using them is essentially a pure gain in terms of processing speed.

CTODO

The year of a data record is defined by its start time. This may be different than the convention used by Climate Trace. To check.

@data_function("/ct_pre")
def ct_pre():
    write_directory = os.path.join(tempfile.gettempdir(), "ct_pre")
    data_files = [str(p) for p in load_sources()]
    duckdb.sql("""
    COPY
          (SELECT *,date_part('year', start_time) AS year FROM read_parquet({data_files}))
    TO '{tmp_dir}' (FORMAT PARQUET, PARTITION_BY (gas,year), CODEC 'zstd', OVERWRITE_OR_IGNORE)
    """.format(data_files=str(data_files), tmp_dir=str(write_directory))
    )
    return write_directory

ct_pre()
'/media/tjhunter/DATA/temp/ct_pre'
def _write_source_file(gas, year, ct_pre_fname):
    logger = logging.getLogger(__name__)
    tmp_dir = tempfile.gettempdir()
    ct_pre_pq = os.path.join(ct_pre_fname, f"gas={gas}", f"year={year}")
    local_pq = os.path.join(tmp_dir, "temp.parquet")
    logger.debug("writing source file for year=%s gas=%s %s", year, gas, local_pq)
    (pl.scan_parquet(ct_pre_pq)
     .pipe(ct.data.recast_parquet, conf=True)
     .sort(by=[SUBSECTOR])
     .sink_parquet(local_pq,
        compression="zstd",
        maintain_order=True,
        statistics=True,
        compression_level=2,
        row_group_size=300_000,
        data_page_size=10_000_000
        )
    )
    version = ct.data.version
    fname = os.path.join(tmp_dir,
                         "climate_trace_sources",
                         f"climate_trace-sources_{version}_{year}_{gas}.parquet") 
    logger.debug("final source file: %s", fname)
    ds = pyarrow.dataset.dataset(local_pq)
    arrow_path = os.path.join(tmp_dir, "arrow_tmp")
    pyarrow.dataset.write_dataset(
        ds,
        base_dir=arrow_path,
        basename_template="ds_{i}.parquet",
        format="parquet",
        partitioning=None,
        min_rows_per_group=300_000,
        max_rows_per_group=1_000_000,
        existing_data_behavior='overwrite_or_ignore'
    )
    os.makedirs(os.path.dirname(fname), exist_ok=True)
    shutil.copyfile(os.path.join(arrow_path, "ds_0.parquet"), fname)
    return fname
years = ct.data.years
gases = ct.constants.GAS_LIST

@data_function("/write_sources")
def write_sources():
    ct_pre_fname = ct_pre()
    fnames = []
    for gas in gases:
        for year in years:
            fname = _write_source_file(gas,year, ct_pre_fname)
            fnames.append(fname)
    return fnames

write_sources()
['/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2021_co2.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2022_co2.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2023_co2.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2024_co2.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2021_ch4.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2022_ch4.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2023_ch4.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2024_ch4.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2021_n2o.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2022_n2o.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2023_n2o.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2024_n2o.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2021_co2e_100yr.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2022_co2e_100yr.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2023_co2e_100yr.parquet',
 '/media/tjhunter/DATA/temp/climate_trace_sources/climate_trace-sources_v3-2024-ct5_2024_co2e_100yr.parquet']

Optimizing row groups A parquet file is a collection of groups of rows, and these rows are organized column-wise along with some statistics. We can choose how many groups to create: the minimum is one group (all the data into a single group), which is the most standard. This is not optimal however: reading can only be done by one processor core at a time. If we have more, they will sit idle. This is why it is better to choose the number of groups to be close to the expected number of processor cores (10-100). When reading, each core will process a different chunk of the file in parallel.

Polars is more limited as of December 2024, so the code below directly calls the pyarrow package to restructure the final file, calling the function pyarrow.dataset.write_dataset.

Here is the parquet files produced directly by Polars. It is the result of joining datasets which themselves are the result of reading many files (each by subsector). It is very fragmented (see the num_row_groups statistics below).

fname_pre = os.path.join(tempfile.gettempdir(), "temp.parquet")
fname_post = write_sources()[-1]
parquet_file = pyarrow.parquet.ParquetFile(fname_pre)
parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x7d34c40918a0>
  created_by: Polars
  num_columns: 55
  num_rows: 15184500
  num_row_groups: 32
  format_version: 1.0
  serialized_size: 182217

The final file is more compact: only 58 row groups. It will be much faster to read (up to 50 times faster on my computer) because the readers do not need to gather information from each of the row groups.

parquet_file = pyarrow.parquet.ParquetFile(fname_post)
parquet_file.metadata
<pyarrow._parquet.FileMetaData object at 0x7d34ac15e160>
  created_by: parquet-cpp-arrow version 18.1.0
  num_columns: 55
  num_rows: 15184500
  num_row_groups: 48
  format_version: 2.6
  serialized_size: 275828

Statistics Each row group in a parquet file has statistics. These statistics contain for each columns basic information such as minimum, maximum, etc. as you can see below. During a query, a data system first reads these statistics to check what blocks of data it should read.

For example, the first row group only contains agriculture data (which you can infer from min: agriculture and max: agriculture). As the result, if a query is looking for waste data, it can safely skip this full block.

Grouping the rows and creating statistics can dramatically reduce the amount of data being read and processed. Finding the right number of groups is a tradeoff between using more cores to read the data in parallel, and not having to read too many statistics descriptions. In the extreme case of the file created by Polars (5000 row groups), the statistics make up 40% of the file and can take up to 90% of the processing time! If your parquet file reads slowly, it is probably due to its internal layout.

parquet_file = pyarrow.parquet.ParquetFile(fname_post)
parquet_file.metadata.row_group(0).column(2).statistics
<pyarrow._parquet.Statistics object at 0x7d34ac15f6f0>
  has_min_max: True
  min: agriculture
  max: mineral-extraction
  null_count: 0
  distinct_count: None
  num_values: 327680
  physical_type: BYTE_ARRAY
  logical_type: String
  converted_type (legacy): UTF8

Initial checks#

We know check that it works correctly. Let’s load the newly created data instead of the default version stored on the internet, for the year 2023.

source_path = tempfile.gettempdir()
sdf = ct.read_source_emissions(gas=CO2, year=2023, p=source_path)
sdf

NAIVE QUERY PLAN

run LazyFrame.show_graph() to see the optimized version

polars_query p1 WITH COLUMNS [col("conf_source_type").cast(Enum(Some(local), Physical)).alias("conf_source_type"), col("conf_capacity").cast(Enum(Some(local), Physical)).alias("conf_capacity"), col("conf_capacity_factor").cast(Enum(Some(local), Physical)).alias("conf_capacity_factor"), col("conf_activity").cast(Enum(Some(local), Physical)).alias("conf_activity"), col("conf_emissions_factor").cast(Enum(Some(local), Physical)).alias("conf_emissions_factor"), col("conf_emissions_quantity").cast(Enum(Some(local), Physical)).alias("conf_emissions_quantity")] p2 WITH COLUMNS [col("iso3_country").strict_cast(Enum(Some(local), Physical)).alias("iso3_country"), col("gas").cast(Enum(Some(local), Physical)).alias("gas"), col("temporal_granularity").strict_cast(Enum(Some(local), Physical)).alias("temporal_granularity"), col("subsector").strict_cast(Enum(Some(local), Physical)).alias("subsector"), col("sector").strict_cast(Enum(Some(local), Physical)).alias("sector")] p1--p2 p3 Parquet SCAN [/media/tjhunter/DATA/temp/v3-2024-ct5/climate_trace-sources_v3-2024-ct5_2023_co2.parquet] π */55; p2--p3

About 15M records for this year. This is spread across multiple gas and also multiple trips in the case of boats or airplanes.

sdf.select(pl.len()).collect()
shape: (1, 1)
len
u32
15184500

Check the number of distinct source IDs

by_sec = (sdf
.group_by(SOURCE_ID, SECTOR)
.agg(pl.len())
.collect())

The number of sources outside forestry and land use:

by_sec.filter(c_sector != FORESTRY_AND_LAND_USE).select(pl.len())
shape: (1, 1)
len
u32
749594

Check: no source is associated with multiple sectors.

by_sec.group_by(SOURCE_ID).agg(c_sector.n_unique()).filter(pl.col(SECTOR) > 1)
shape: (0, 2)
source_idsector
u64u32

Check: no annual source should be duplicated by gas. It used to be the case with V2 release.

(sdf
.filter(c_temporal_granularity =="annual")
.group_by(SOURCE_ID, GAS)
.agg(pl.len())
.filter(pl.col("len") > 1)
.sort(by="len")
.collect())
shape: (0, 3)
source_idgaslen
u64enumu32

Check: emissions should always be defined. V2 used to have empty values.

sdf = ct.read_source_emissions(CO2E_100YR, 2023, source_path)
(sdf
 .select(c_emissions_quantity.is_null().alias("null_emissions"), c_subsector, c_iso3_country)
 .group_by(c_subsector, "null_emissions")
 .agg(pl.len())
 .collect()
 .pivot(index=SUBSECTOR, on="null_emissions", values="len")
)
shape: (48, 2)
subsectorfalse
enumu32
"removals"721668
"cropland-fires"688584
"other-manufacturing"4776
"bauxite-mining"3168
"iron-and-steel"10728
"water-reservoirs"84408
"iron-mining"8664
"forest-land-fires"647772
"manure-management-cattle-opera…932736
"international-shipping"86256

Integrity checks#

Before uploading and publishing data, it is a good idea to run a number of checks. Frameworks such as pandera are very helpful to implement these checks. Here we just check that Akrotiri and Dhekelia (country code XAD) is not included, as mentioned in the documentation. It used to be included in older data releases.

(ct.read_source_emissions(gas=GAS_LIST, year=years, p=source_path)
 .filter(c_iso3_country == "XAD")
 .select(pl.len())
.collect())
shape: (1, 1)
len
u32
0

CO2e subsector data should be a superset of all sectors#

Here is a normalized check that is worth checking for any data release: one would expect the total CO2e_100yr (total emissions normalized by their CO2 equivalent) to be at least present for each sector in which emissions are reported. This was not the case until 2024-12-01 and has been fixed since then.

with pl.Config(tbl_rows=20):
    print(ct.read_source_emissions(gas=GAS_LIST, year=years, p=source_path)
     .group_by(c_sector, c_subsector, c_gas)
     .agg(c_emissions_quantity.sum())
     .collect(streaming=True)
     .pivot(GAS, index=[SECTOR, SUBSECTOR])
     .filter(pl.col(CO2E_100YR).is_null())
     .filter((pl.col(N2O) != 0) | (pl.col(CH4) != 0) | (pl.col(CO2) != 0))
    )
shape: (0, 6)
┌────────┬───────────┬─────┬─────┬────────────┬─────┐
│ sector ┆ subsector ┆ co2 ┆ ch4 ┆ co2e_100yr ┆ n2o │
│ ---    ┆ ---       ┆ --- ┆ --- ┆ ---        ┆ --- │
│ enum   ┆ enum      ┆ f64 ┆ f64 ┆ f64        ┆ f64 │
╞════════╪═══════════╪═════╪═════╪════════════╪═════╡
└────────┴───────────┴─────┴─────┴────────────┴─────┘

Create parquet files for country emissions#

As of V3, country emission data is also large enough that it should be compacted in parquet files. Note the dramatic difference:

  • uncompressed CSV file: 106MB

  • compressed CSV file: 6MB

  • parquet: 2MB !!

As highlighted, the parquet file also has the advantage of being very efficient at extracting only the relevant information.

# Starting from the official archives, read all the gases.

@data_function("/read_country")
def read_country():
    path = Path(tempfile.gettempdir()) / f"climate-trace-countries-{ct.data.version}.parquet"
    print(path)
    cdf = ct.read_country_emissions(ct.constants.GAS_LIST, archive_path=True)
    # Optimizing to read by time and then gas.
    # The logic being that country-specific files are already available from CT.
    (cdf
     .sort(by=[c_start_time,c_gas,c_iso3_country])
      .write_parquet(path) # Not taking precautions, the file is so small.
    )
    return path

p = read_country()

Country emissions: integrity checks#

In a production pipeline, before uploading the final data, we would run a number of checks again on the country emissions. Here are a few checks that we can run (and which are currently failing).

cdf = ct.read_country_emissions(parquet_path=p)
cdf.head(2)
shape: (2, 11)
iso3_countrystart_timeend_timegassectorsubsectoremissions_quantityemissions_quantity_unitstemporal_granularitycreated_datemodified_date
enumdatetime[ms, UTC]datetime[ms, UTC]enumenumenumf64catenumdatetime[ms, UTC]datetime[ms, UTC]
"ABW"2015-01-01 00:00:00 UTC2015-12-31 00:00:00 UTC"co2""fossil-fuel-operations""other-fossil-fuel-operations"0.0null"annual"nullnull
"ABW"2015-01-01 00:00:00 UTC2015-12-31 00:00:00 UTC"co2""mineral-extraction""bauxite-mining"0.0null"annual"nullnull

Country emissions: CO2e data should be a superset of all country emissions#

This was an issue as of 2024-12 and has been fixed since then.

with pl.Config(tbl_rows=20):
    print(cdf
     .group_by(c_sector, c_subsector, c_gas)
     .agg(c_emissions_quantity.sum())
     .sort(by=[c_sector, c_subsector, c_gas])
     .pivot(GAS, index=[SECTOR, SUBSECTOR])
     .filter(pl.col(CO2E_100YR).is_null())
     .filter(pl.col(CO2) != 0)
    )
shape: (0, 6)
┌────────┬───────────┬─────┬─────┬─────┬────────────┐
│ sector ┆ subsector ┆ co2 ┆ ch4 ┆ n2o ┆ co2e_100yr │
│ ---    ┆ ---       ┆ --- ┆ --- ┆ --- ┆ ---        │
│ enum   ┆ enum      ┆ f64 ┆ f64 ┆ f64 ┆ f64        │
╞════════╪═══════════╪═════╪═════╪═════╪════════════╡
└────────┴───────────┴─────┴─────┴─────┴────────────┘

Country emissions: some countries are excluded from the dataset#

The Climate TRACE documentation excludes certain countries from the final release. They used to be present as of 2024-12.

excluded_isos = ["XAD", "XCL", "XPI", "XSP"]
(cdf
 .filter(c_iso3_country.is_in(excluded_isos))
 .group_by([ISO3_COUNTRY, c_start_time.dt.year(), GAS, SECTOR, SUBSECTOR])
 .agg(pl.len()))
shape: (0, 6)
iso3_countrystart_timegassectorsubsectorlen
enumi32enumenumenumu32

Upload the data to the Hugging Face Hub#

As a final step, we make the datasets available on Hugging Face as a downloadable dataset.

This step will only work if you have the credentials to upload the dataset.

upload = False
if upload:
    try:
        api = huggingface_hub.HfApi()
        for fpath in write_sources():
            fname = os.path.join(ct.data.version, os.path.basename(fpath))
            print(fname, fpath)
            api.upload_file(
                path_or_fileobj=fpath,
                path_in_repo=fname,
                repo_id="tjhunter/climate-trace",
                repo_type="dataset",
            )
        fpath = read_country()
        fname = os.path.basename(fpath)
        print(fname, fpath)
        api.upload_file(
            path_or_fileobj=fpath,
            path_in_repo=fname,
            repo_id="tjhunter/climate-trace",
            repo_type="dataset",
        )
    except huggingface_hub.utils.HfHubHTTPError as e:
        print("error")
        print(e)