Data Setup¶

This notebook nicely aggregates data for analysis. This includes EM27 oof and side by side corrections, averaging kernels, and met data. For the Meyer 2025 paper (https://doi.org/10.1029/2025JD044398), this notebook creates the datasets available at https://doi.org/10.1029/2025JD044398. These datasets are subsequently used in the EM27_ratios.ipynb and Inventory.ipynb demonstrations. I create the datasets for the two EM27s I use in SLV, but the paper only uses the 'ha' instrument.

In [ ]:
#Import Packages
import sys
import pickle
import os
import pytz
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker
import matplotlib.dates as mdates
import matplotlib.gridspec as gridspec
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import xarray as xr
import warnings
import json

sys.path.append('/uufs/chpc.utah.edu/common/home/u0890904/LAIR_1/')
from atmos.utils import met_utils, datetime_utils, plot_utils, regression_utils, df_utils, ac_utils

pd.options.mode.chained_assignment = None
warnings.filterwarnings("ignore")
%load_ext autoreload
%autoreload 2
In [ ]:
#Define some global variable like path, timezone, etc.
project_path = '/uufs/chpc.utah.edu/common/home/u0890904/public_html/Projects/SLV_EM27_2025'
project_data_path = os.path.join(project_path,'Data')
timezone = 'US/Mountain'

EM27 Data Setup¶

In [ ]:
#Define a plotting function
def plot_side_by_side(merged_oof_df, corr_gases, true_inst, corr_inst, regressions, timezone, title_id,summary_keys=None):
    dates = sorted(list(set(merged_oof_df.index.date)))
    broken_dtranges = []
    for date in dates:
        dtstart_str = f'{date} 08:00:00'
        dtend_str = f'{date} 21:00:00'
        broken_dtranges.append([pd.to_datetime(dtstart_str).tz_localize(timezone), pd.to_datetime(dtend_str).tz_localize(timezone)])
    # Find the largest difference between consecutive days
    date_diffs = [(dates[i+1] - dates[i]).days for i in range(len(dates) - 1)]
    max_diff_index = date_diffs.index(max(date_diffs)) + 1  # +1 to get the index of the day after the largest gap

    fig = plt.figure(figsize=(20, 10))
    gs = gridspec.GridSpec(len(corr_gases), 8, width_ratios=[.8, .8, .8, .8, .8, .8, .2, 1])
    true_inst_color = 'tomato'
    corr_inst_color = 'deepskyblue'
    em27_marker_size = 3
    gridalpha = 0.2
    labsize = 12
    d = .01  # how big to make the diagonal lines in axes coordinates

    for row in range(len(corr_gases)):
        gas = corr_gases[row]
        for col in range(len(broken_dtranges)):
            ax = plt.subplot(gs[row, col])
            ax.scatter(merged_oof_df.index, merged_oof_df[f'{gas}_{true_inst}'], color=true_inst_color, s=em27_marker_size)
            ax.scatter(merged_oof_df.index, merged_oof_df[f'{gas}_{corr_inst}'], color=corr_inst_color, s=em27_marker_size)
            if col == 0:
                alpha = 0.8
            else:
                alpha = 1
            ax.set_xlim(broken_dtranges[col][0], broken_dtranges[col][1])
            if col < len(broken_dtranges) - 1:
                if col != max_diff_index - 1:
                    ax.spines['right'].set_visible(False)
                ax.yaxis.tick_left()
            if col > 0:
                ax.yaxis.set_visible(False)
            if col > 0:
                if col != max_diff_index:
                    ax.spines['left'].set_visible(False)
                ax.yaxis.tick_right()
            if col < len(broken_dtranges) - 1:
                kwargs = dict(transform=ax.transAxes, color='k', clip_on=False)
                ax.plot((1 - d, 1 + d), (-d, +d), **kwargs)  # top-left diagonal
                ax.plot((1 - d, 1 + d), (1 - d, 1 + d), **kwargs)  # bottom-left diagonal
            if col > 0:
                kwargs.update(transform=ax.transAxes)  # switch to the bottom axes
                ax.plot((-d, d), (-d, +d), **kwargs)  # top-right diagonal
                ax.plot((-d, d), (1 - d, 1 + d), **kwargs)  # bottom-right diagonal
            ax.get_xaxis().set_ticklabels([])
            ax.xaxis.set_major_locator(mdates.HourLocator(interval=2))
            ax.set_ylabel(gas, size=labsize)
            ax.tick_params(labelsize=labsize - 2)
            if (row == 0) & (col == 5):
                ax.scatter([], [], color=true_inst_color, s=30, label=f'EM27_{true_inst}')
                ax.scatter([], [], color=corr_inst_color, s=30, label=f'EM27_{corr_inst}')
                ax.legend(fontsize=labsize - 1)
            if row == 2:
                ax.tick_params(labelsize=labsize - 2)
                ax.xaxis.set_major_locator(mdates.HourLocator(interval=2))
                ax.xaxis.set_major_formatter(mdates.DateFormatter('%H', tz=pytz.timezone(timezone)))
                ax.set_xlabel(f"{broken_dtranges[col][0].strftime('%Y-%m-%d')} {merged_oof_df.index[0].strftime('%Z')}", size=labsize)
        ax = plt.subplot(gs[row, len(broken_dtranges)])
        ax.set_visible(False)
        ax = plt.subplot(gs[row, len(broken_dtranges) + 1])
        x = f'{gas}_{corr_inst}'
        y = f'{gas}_{true_inst}'
        ax.scatter(merged_oof_df[x], merged_oof_df[y], color='k', s=2)
        #plot_utils.plot_reg_on_ax(ax, regressions[gas], labsize=labsize, color='grey')
        
        plot_utils.add_single_regression_to_ax(ax,merged_oof_df,x,y,regressions[gas],summary_keys=summary_keys)
        ax.set_xlabel(x, size=labsize)
        ax.set_ylabel(y, size=labsize)
        ax.tick_params(labelsize=labsize - 2)
    plt.subplots_adjust(hspace=0.3, wspace=0.1)
    fig.suptitle(f'Side By Side ({title_id}): {true_inst} , {corr_inst}', y=.93)
    fig.show()
In [ ]:
#Define the oof paths
em27_data_path = '/uufs/chpc.utah.edu/common/home/lin-group24/agm/EM27' #This holds the entire EM27 dataset and GGG/EGI retrieval results.
project_oof_path = os.path.join(project_path,'Data','EM27_oof') #This holds just the oof files for convenience. 
In [ ]:
#Define the site locations and instrument details (when and where they were deployed)
site_locs = {'WBB':{'lat':40.768,'lon':-111.854,'masl':1464.9},
             'DBK':{'lat':40.538,'lon':-112.070,'masl':1584},
             'UUSYR':{'lat':41.089,'lon':-112.119,'masl':1286}}

inst_details = {
    'ha':[{'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2022-05-01 00:00:00', '2025-02-01 00:00:00', timezone)}],
    'ua':[{'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2023-07-08 11:00:00', '2023-07-11 23:59:59', timezone)},
           {'site':'DBK', 'dtr': datetime_utils.DateTimeRange('2023-07-12 00:00:00', '2023-08-11 23:59:59', timezone)},
           {'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2023-08-12 00:00:00', '2023-08-14 23:59:59', timezone)},
           {'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2024-07-20 00:00:00', '2024-07-25 23:59:59', timezone)},
           {'site':'UUSYR', 'dtr': datetime_utils.DateTimeRange('2024-07-23 00:00:00', '2024-08-31 23:59:59', timezone)},
           {'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2024-09-01 00:00:00', '2024-09-07 23:59:59', timezone)},
           ],
}

inst_ids = list(inst_details.keys())
In [ ]:
#Copy all of the oof files to a single folder for each instrument
for inst_id in inst_ids:
    inst_oof_cp_path = os.path.join(project_oof_path,inst_id)
    if not os.path.exists(inst_oof_cp_path):
        os.makedirs(inst_oof_cp_path)
    if len(os.listdir(inst_oof_cp_path)) > 0:
        raise ValueError('Path already exists, you may end up overwriting data')
    inst_oof_og_path = os.path.join(em27_data_path,inst_id,'results')
    ac_utils.copy_em27_oofs_to_singlefolder(inst_oof_og_path,inst_oof_cp_path)
In [ ]:
#Load all of the EM27 data defined in the instrument details
cols_to_load = ['flag','solzen(deg)','xco(ppb)','xco(ppb)_error','xch4(ppm)','xch4(ppm)_error','xco2(ppm)','xco2(ppm)_error'] #pare down the dfs to just these cols
cols_to_keep = ['solzen(deg)','site','xco(ppm)','xco(ppm)_error','xch4(ppm)','xch4(ppm)_error','xco2(ppm)','xco2(ppm)_error'] #cols to keep in the final df
good_flags = [0,23] #these are the flags that are considered "good" here. 0 is the default "good" flag, 23 is the flag when nas are in the WSPD column (which we get elsewhere)
oof_dfs = {} #store the oof dfs in a dictionary by instrument
for inst_id in inst_ids: 
    oof_df = pd.DataFrame()
    inst_oof_path = os.path.join(project_oof_path,inst_id)
    my_oof_manager = ac_utils.oof_manager(inst_oof_path,timezone)
    inst_det_list = inst_details[inst_id]
    for inst_det in inst_det_list:
        site = inst_det['site']
        dtr = inst_det['dtr']
        dt1 = dtr.start_dt
        dt2 = dtr.end_dt
        new_oof_df = my_oof_manager.load_oof_df_inrange(dt1,dt2,filter_flag_0 = False,cols_to_load = cols_to_load,print_out = True) #load the oof data in the specified range
        new_oof_df = new_oof_df[new_oof_df['flag'].isin(good_flags)] #filter by the good flags
        for col in new_oof_df.columns: #convert the ppb columns to ppm
            if 'ppb' in col:
                new_col_name = col.replace('ppb','ppm')
                new_oof_df[new_col_name] = new_oof_df[col]/1000 
        new_oof_df['site'] = site
        new_oof_df = new_oof_df[cols_to_keep] #pare down the df to just the columns we want
        oof_df = pd.concat([oof_df,new_oof_df]) #concatenate the new df to the old one
    oof_dfs[inst_id] = oof_df #store the df in the dictionary

Side By Side Correction¶

First find the corrections for each set of side-by-sides using regressions

In [ ]:
#Find the correction factors 
true_inst = 'ha'
corr_inst = 'ua'
sbs_loc = 'WBB'
resample_interval = '5min'
corr_gases = ['xco(ppm)','xch4(ppm)','xco2(ppm)']
plot = True

sbs_total_dtranges = [datetime_utils.DateTimeRange('2023-07-08 11:00:00', '2023-08-14 23:59:59', timezone),
                      datetime_utils.DateTimeRange('2024-07-20 00:00:00', '2024-09-07 23:59:59', timezone)]
sbs_details = []
for sbs_total_dtrange in sbs_total_dtranges:
    sbs_dt_ranges = ac_utils.find_sbs_ranges_inrange(inst_details,true_inst,corr_inst,sbs_total_dtrange)[sbs_loc]
    sbs_oof_dfs = {}
    for inst_id in [true_inst,corr_inst]:
        sbs_oof_df = pd.DataFrame()
        for sbs_dtr in sbs_dt_ranges:
            new_oof_df = oof_dfs[inst_id].loc[(oof_dfs[inst_id].index >= sbs_dtr.start_dt) & (oof_dfs[inst_id].index <= sbs_dtr.end_dt)]
            sbs_oof_df = pd.concat([sbs_oof_df,new_oof_df])
        sbs_oof_dfs[inst_id] = sbs_oof_df.resample(resample_interval).mean(numeric_only=True)
    merged_oof_df = ac_utils.merge_oofdfs(sbs_oof_dfs,dropna=True)
    corr_regressions = {}
    for corr_gas in corr_gases:
        #corr_regressions[corr_gas] = ac.lin_regress_2(merged_oof_df,f'{corr_gas}_{corr_inst}',f'{corr_gas}_{true_inst}')
        corr_regressions[corr_gas] = regression_utils.ols_regression(merged_oof_df,f'{corr_gas}_{corr_inst}',f'{corr_gas}_{true_inst}')
    if plot:
        fig = plot_side_by_side(merged_oof_df, corr_gases, true_inst, corr_inst, corr_regressions, timezone, 'No Correction',summary_keys=['slope','r_squared'])
    
    corr_merged_oof_df = merged_oof_df.copy()
    for corr_gas in corr_gases:
        corr_merged_oof_df[f'{corr_gas}_{corr_inst}'] = corr_merged_oof_df.apply(
            lambda row: row[f'{corr_gas}_{corr_inst}']*corr_regressions[corr_gas]['slope'] + corr_regressions[corr_gas]['intercept']
            ,axis = 1)
        
    post_corr_regressions = {}
    for corr_gas in corr_gases:
        post_corr_regressions[corr_gas] = regression_utils.ols_regression(corr_merged_oof_df,f'{corr_gas}_{corr_inst}',f'{corr_gas}_{true_inst}')
        #ac.lin_regress_2(corr_merged_oof_df,f'{corr_gas}_{true_inst}',f'{corr_gas}_{corr_inst}')

    if plot:
        fig = plot_side_by_side(corr_merged_oof_df, corr_gases, true_inst, corr_inst, post_corr_regressions, timezone, 'With Correction',summary_keys=['slope','r_squared'])
    
    sbs_details.append({'sbs_total_dtrange':sbs_total_dtrange,
                        'corr_regressions':corr_regressions})

Now apply the corrections to the data based on the applicable datetime ranges and the regression coefficients

In [ ]:
#Apply the corrections to the entire dataset
corr_oof_dfs = {}
corr_oof_dfs[true_inst] = oof_dfs[true_inst].copy()
oof_df = oof_dfs[corr_inst].copy()
corr_oof_df = pd.DataFrame()
for sbs_detail in sbs_details:
    dtr = sbs_detail['sbs_total_dtrange']
    corr_regressions = sbs_detail['corr_regressions']
    sub_oof_df = oof_df.loc[(oof_df.index >= dtr.start_dt) & (oof_df.index <= dtr.end_dt)]
    for corr_gas in corr_gases:
        sub_oof_df[corr_gas] = sub_oof_df[corr_gas] * corr_regressions[corr_gas]['slope'] + corr_regressions[corr_gas]['intercept']
    corr_oof_df = pd.concat([corr_oof_df,sub_oof_df])
corr_oof_dfs[corr_inst] = corr_oof_df
In [ ]:
#Clip to the correct date ranges
for inst_id, inst_det_list in inst_details.items():
    for inst_det in inst_det_list:
        site = inst_det['site']
        dtr = inst_det['dtr']
        corr_oof_dfs[inst_id].loc[(corr_oof_dfs[inst_id].index >= dtr.start_dt) & (corr_oof_dfs[inst_id].index <= dtr.end_dt), 'site'] = site

Use the cell below to do a check plot with plotly to make sure the corrections are working as expected

In [ ]:
#Plotly plot to check that everything lines up where it should
plot_corr_oof_dfs = {}
for inst_id,corr_oof_df in corr_oof_dfs.items():
    plot_corr_oof_dfs[inst_id] = corr_oof_dfs[inst_id].copy().resample(resample_interval).mean(numeric_only=True).dropna()
fig = make_subplots(rows=3, cols=1, subplot_titles=corr_gases,shared_xaxes=True)
for i,corr_gas in enumerate(corr_gases):
    for inst_id,plot_corr_oof_df in plot_corr_oof_dfs.items():
        if inst_id == true_inst:
            color = 'tomato'
        else:
            color = 'deepskyblue'
        fig.add_trace(go.Scatter(
            x=plot_corr_oof_df.index, 
            y=plot_corr_oof_df[corr_gas], 
            mode='markers', 
            marker_color=color,
            name=f'{inst_id}_{corr_gas}'), 
            row=i+1, col=1)
fig.show()

Write to Parquet File¶

In [ ]:
# Concat the dfs:
corr_oof_df_concat = df_utils.concat_dict_of_dfs(corr_oof_dfs,drop_index=False)

# Write to Parquet File
corr_oof_df_concat.to_parquet(os.path.join(project_data_path,'corr_oof_df_concat.parquet'),index = False)

Averaging Kernels¶

In [ ]:
# This pulls out the surface averaging kernels from the netcdf files created by the retrieval. 
# This can take a while because it has to open every netcdf file and I never streamlined the code 

xgas_names = ['xco','xch4','xco2']
surface_ak_dfs = {}
for inst_id in inst_details.keys():
    print(inst_id)
    surface_ak_df = pd.DataFrame()
    daily_path = os.path.join(em27_data_path,inst_id,'results','daily')
    for datestr in sorted(os.listdir(daily_path)):
        date_path = os.path.join(daily_path,datestr)
        print(datestr)
        try:
            nc_fname = [file for file in os.listdir(date_path) if file.endswith('.nc')][0]
        except:
            continue
        ds = xr.open_dataset(os.path.join(date_path,nc_fname))
        surface_ak_ds = xr.Dataset()
        for xgas_name in xgas_names:
            surface_ak_interp = ac_utils.get_surface_ak(ds,xgas_name)
            surface_ak_ds[f'{xgas_name}_surf_ak'] = surface_ak_interp
        df = surface_ak_ds.to_dataframe()
        surface_ak_df = pd.concat([surface_ak_df,df])
    surface_ak_df = surface_ak_df.rename_axis('dt')
    surface_ak_df.index = surface_ak_df.index.tz_localize('UTC').tz_convert(timezone)
    surface_ak_dfs[inst_id] = surface_ak_df
In [ ]:
# Write to parquet file
surface_ak_df_concat = df_utils.concat_dict_of_dfs(surface_ak_dfs,drop_index=False)
surface_ak_df_concat.to_parquet(os.path.join(project_data_path,'surface_ak_df_concat.parquet'),index = False)

Met Data¶

In [ ]:
# Load meteorological data directly from the met files in the EM27 data path. 

met_type = 'ggg'
mh = met_utils.MetHandler()
met_dfs = {}

for inst_id, details in inst_details.items():
    for detail in details:
        site = detail['site']
        if (inst_id == 'ua') & (site == 'WBB') | (site == 'DBK'):
            continue
        print(f'Loading data for {inst_id} at {site}')
        dtr = detail['dtr']
        met_data_path = os.path.join(em27_data_path, inst_id, 'inst_data/met', site)
        
        #Load the data for the given datetime range
        met_df = mh.load_stddata_in_range(met_type, met_data_path, dtr)
        
        met_df[['u','v']] = met_df.apply(lambda row: met_utils.wswd_to_uv(row['ws'],row['wd']),axis = 1, result_type = 'expand')

        #Concatenate the dataframes for each site
        if site not in met_dfs:
            met_dfs[site] = met_df
        else:
            met_dfs[site] = pd.concat([met_dfs[site], met_df])
In [ ]:
# Write met data to parquet files
met_dfs_concat = df_utils.concat_dict_of_dfs(met_dfs,drop_index=False)
met_dfs_concat.to_parquet(os.path.join(project_data_path,'met_dfs_concat.parquet'),index = False)

HRRR Smoke¶

Prior to running this notebook, the "hrrr_smoke.py" main script should be run to download and format the HRRR smoke data. This script will download the data for the specified date range and save it to a file named hrrr_smoke.parquet in the data folder. It can be run using the code in slurm/jobs/hrrr_smoke.slurm.

In [ ]:
smoke_df_fname = 'hrrr_smoke.parquet'
smoke_df = pd.read_parquet(os.path.join(project_data_path, smoke_df_fname))


# Calculate daily mean and max
smoke_day_df = smoke_df.groupby(smoke_df.index.date).agg({
    'tc_mdens': ['mean', 'max'],
    'mdens': ['mean', 'max'],
})
# Flatten the MultiIndex columns
smoke_day_df.columns = ['tc_mdens_mean', 'tc_mdens_max', 'mdens_mean', 'mdens_max']

# Calculate statistics for mean values
mean_tc_mdens = smoke_day_df['tc_mdens_mean'].mean()
median_tc_mdens = smoke_day_df['tc_mdens_mean'].median()
std_tc_mdens = smoke_day_df['tc_mdens_mean'].std()
max_tc_mdens = smoke_day_df['tc_mdens_mean'].max()

# Filter outliers based on mean values
outliers_mean = smoke_day_df[smoke_day_df['tc_mdens_mean'] > (median_tc_mdens + std_tc_mdens)]
outliers_max = smoke_day_df[smoke_day_df['tc_mdens_max'] > (median_tc_mdens + std_tc_mdens)]

outlier_dates_mean = outliers_mean.index
outlier_dates_max = outliers_max.index

outliers = {
    "mean": outlier_dates_mean.astype(str).tolist(),
    "max": outlier_dates_max.astype(str).tolist(),
}
In [ ]:
# Save to json file
with open(os.path.join(project_data_path,'smoke_outliers.json'), 'w') as handle:
    json.dump(outliers, handle, indent=4, sort_keys=True, default=str)