Data Setup¶
This notebook nicely aggregates data for analysis. This includes EM27 oof and side by side corrections, averaging kernels, and met data. For the Meyer 2025 paper (https://doi.org/10.1029/2025JD044398), this notebook creates the datasets available at https://doi.org/10.1029/2025JD044398. These datasets are subsequently used in the EM27_ratios.ipynb and Inventory.ipynb demonstrations. I create the datasets for the two EM27s I use in SLV, but the paper only uses the 'ha' instrument.
#Import Packages
import sys
import pickle
import os
import pytz
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker
import matplotlib.dates as mdates
import matplotlib.gridspec as gridspec
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import xarray as xr
import warnings
import json
sys.path.append('/uufs/chpc.utah.edu/common/home/u0890904/LAIR_1/')
from atmos.utils import met_utils, datetime_utils, plot_utils, regression_utils, df_utils, ac_utils
pd.options.mode.chained_assignment = None
warnings.filterwarnings("ignore")
%load_ext autoreload
%autoreload 2
#Define some global variable like path, timezone, etc.
project_path = '/uufs/chpc.utah.edu/common/home/u0890904/public_html/Projects/SLV_EM27_2025'
project_data_path = os.path.join(project_path,'Data')
timezone = 'US/Mountain'
EM27 Data Setup¶
#Define a plotting function
def plot_side_by_side(merged_oof_df, corr_gases, true_inst, corr_inst, regressions, timezone, title_id,summary_keys=None):
dates = sorted(list(set(merged_oof_df.index.date)))
broken_dtranges = []
for date in dates:
dtstart_str = f'{date} 08:00:00'
dtend_str = f'{date} 21:00:00'
broken_dtranges.append([pd.to_datetime(dtstart_str).tz_localize(timezone), pd.to_datetime(dtend_str).tz_localize(timezone)])
# Find the largest difference between consecutive days
date_diffs = [(dates[i+1] - dates[i]).days for i in range(len(dates) - 1)]
max_diff_index = date_diffs.index(max(date_diffs)) + 1 # +1 to get the index of the day after the largest gap
fig = plt.figure(figsize=(20, 10))
gs = gridspec.GridSpec(len(corr_gases), 8, width_ratios=[.8, .8, .8, .8, .8, .8, .2, 1])
true_inst_color = 'tomato'
corr_inst_color = 'deepskyblue'
em27_marker_size = 3
gridalpha = 0.2
labsize = 12
d = .01 # how big to make the diagonal lines in axes coordinates
for row in range(len(corr_gases)):
gas = corr_gases[row]
for col in range(len(broken_dtranges)):
ax = plt.subplot(gs[row, col])
ax.scatter(merged_oof_df.index, merged_oof_df[f'{gas}_{true_inst}'], color=true_inst_color, s=em27_marker_size)
ax.scatter(merged_oof_df.index, merged_oof_df[f'{gas}_{corr_inst}'], color=corr_inst_color, s=em27_marker_size)
if col == 0:
alpha = 0.8
else:
alpha = 1
ax.set_xlim(broken_dtranges[col][0], broken_dtranges[col][1])
if col < len(broken_dtranges) - 1:
if col != max_diff_index - 1:
ax.spines['right'].set_visible(False)
ax.yaxis.tick_left()
if col > 0:
ax.yaxis.set_visible(False)
if col > 0:
if col != max_diff_index:
ax.spines['left'].set_visible(False)
ax.yaxis.tick_right()
if col < len(broken_dtranges) - 1:
kwargs = dict(transform=ax.transAxes, color='k', clip_on=False)
ax.plot((1 - d, 1 + d), (-d, +d), **kwargs) # top-left diagonal
ax.plot((1 - d, 1 + d), (1 - d, 1 + d), **kwargs) # bottom-left diagonal
if col > 0:
kwargs.update(transform=ax.transAxes) # switch to the bottom axes
ax.plot((-d, d), (-d, +d), **kwargs) # top-right diagonal
ax.plot((-d, d), (1 - d, 1 + d), **kwargs) # bottom-right diagonal
ax.get_xaxis().set_ticklabels([])
ax.xaxis.set_major_locator(mdates.HourLocator(interval=2))
ax.set_ylabel(gas, size=labsize)
ax.tick_params(labelsize=labsize - 2)
if (row == 0) & (col == 5):
ax.scatter([], [], color=true_inst_color, s=30, label=f'EM27_{true_inst}')
ax.scatter([], [], color=corr_inst_color, s=30, label=f'EM27_{corr_inst}')
ax.legend(fontsize=labsize - 1)
if row == 2:
ax.tick_params(labelsize=labsize - 2)
ax.xaxis.set_major_locator(mdates.HourLocator(interval=2))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H', tz=pytz.timezone(timezone)))
ax.set_xlabel(f"{broken_dtranges[col][0].strftime('%Y-%m-%d')} {merged_oof_df.index[0].strftime('%Z')}", size=labsize)
ax = plt.subplot(gs[row, len(broken_dtranges)])
ax.set_visible(False)
ax = plt.subplot(gs[row, len(broken_dtranges) + 1])
x = f'{gas}_{corr_inst}'
y = f'{gas}_{true_inst}'
ax.scatter(merged_oof_df[x], merged_oof_df[y], color='k', s=2)
#plot_utils.plot_reg_on_ax(ax, regressions[gas], labsize=labsize, color='grey')
plot_utils.add_single_regression_to_ax(ax,merged_oof_df,x,y,regressions[gas],summary_keys=summary_keys)
ax.set_xlabel(x, size=labsize)
ax.set_ylabel(y, size=labsize)
ax.tick_params(labelsize=labsize - 2)
plt.subplots_adjust(hspace=0.3, wspace=0.1)
fig.suptitle(f'Side By Side ({title_id}): {true_inst} , {corr_inst}', y=.93)
fig.show()
#Define the oof paths
em27_data_path = '/uufs/chpc.utah.edu/common/home/lin-group24/agm/EM27' #This holds the entire EM27 dataset and GGG/EGI retrieval results.
project_oof_path = os.path.join(project_path,'Data','EM27_oof') #This holds just the oof files for convenience.
#Define the site locations and instrument details (when and where they were deployed)
site_locs = {'WBB':{'lat':40.768,'lon':-111.854,'masl':1464.9},
'DBK':{'lat':40.538,'lon':-112.070,'masl':1584},
'UUSYR':{'lat':41.089,'lon':-112.119,'masl':1286}}
inst_details = {
'ha':[{'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2022-05-01 00:00:00', '2025-02-01 00:00:00', timezone)}],
'ua':[{'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2023-07-08 11:00:00', '2023-07-11 23:59:59', timezone)},
{'site':'DBK', 'dtr': datetime_utils.DateTimeRange('2023-07-12 00:00:00', '2023-08-11 23:59:59', timezone)},
{'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2023-08-12 00:00:00', '2023-08-14 23:59:59', timezone)},
{'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2024-07-20 00:00:00', '2024-07-25 23:59:59', timezone)},
{'site':'UUSYR', 'dtr': datetime_utils.DateTimeRange('2024-07-23 00:00:00', '2024-08-31 23:59:59', timezone)},
{'site':'WBB', 'dtr': datetime_utils.DateTimeRange('2024-09-01 00:00:00', '2024-09-07 23:59:59', timezone)},
],
}
inst_ids = list(inst_details.keys())
#Copy all of the oof files to a single folder for each instrument
for inst_id in inst_ids:
inst_oof_cp_path = os.path.join(project_oof_path,inst_id)
if not os.path.exists(inst_oof_cp_path):
os.makedirs(inst_oof_cp_path)
if len(os.listdir(inst_oof_cp_path)) > 0:
raise ValueError('Path already exists, you may end up overwriting data')
inst_oof_og_path = os.path.join(em27_data_path,inst_id,'results')
ac_utils.copy_em27_oofs_to_singlefolder(inst_oof_og_path,inst_oof_cp_path)
#Load all of the EM27 data defined in the instrument details
cols_to_load = ['flag','solzen(deg)','xco(ppb)','xco(ppb)_error','xch4(ppm)','xch4(ppm)_error','xco2(ppm)','xco2(ppm)_error'] #pare down the dfs to just these cols
cols_to_keep = ['solzen(deg)','site','xco(ppm)','xco(ppm)_error','xch4(ppm)','xch4(ppm)_error','xco2(ppm)','xco2(ppm)_error'] #cols to keep in the final df
good_flags = [0,23] #these are the flags that are considered "good" here. 0 is the default "good" flag, 23 is the flag when nas are in the WSPD column (which we get elsewhere)
oof_dfs = {} #store the oof dfs in a dictionary by instrument
for inst_id in inst_ids:
oof_df = pd.DataFrame()
inst_oof_path = os.path.join(project_oof_path,inst_id)
my_oof_manager = ac_utils.oof_manager(inst_oof_path,timezone)
inst_det_list = inst_details[inst_id]
for inst_det in inst_det_list:
site = inst_det['site']
dtr = inst_det['dtr']
dt1 = dtr.start_dt
dt2 = dtr.end_dt
new_oof_df = my_oof_manager.load_oof_df_inrange(dt1,dt2,filter_flag_0 = False,cols_to_load = cols_to_load,print_out = True) #load the oof data in the specified range
new_oof_df = new_oof_df[new_oof_df['flag'].isin(good_flags)] #filter by the good flags
for col in new_oof_df.columns: #convert the ppb columns to ppm
if 'ppb' in col:
new_col_name = col.replace('ppb','ppm')
new_oof_df[new_col_name] = new_oof_df[col]/1000
new_oof_df['site'] = site
new_oof_df = new_oof_df[cols_to_keep] #pare down the df to just the columns we want
oof_df = pd.concat([oof_df,new_oof_df]) #concatenate the new df to the old one
oof_dfs[inst_id] = oof_df #store the df in the dictionary
Side By Side Correction¶
First find the corrections for each set of side-by-sides using regressions
#Find the correction factors
true_inst = 'ha'
corr_inst = 'ua'
sbs_loc = 'WBB'
resample_interval = '5min'
corr_gases = ['xco(ppm)','xch4(ppm)','xco2(ppm)']
plot = True
sbs_total_dtranges = [datetime_utils.DateTimeRange('2023-07-08 11:00:00', '2023-08-14 23:59:59', timezone),
datetime_utils.DateTimeRange('2024-07-20 00:00:00', '2024-09-07 23:59:59', timezone)]
sbs_details = []
for sbs_total_dtrange in sbs_total_dtranges:
sbs_dt_ranges = ac_utils.find_sbs_ranges_inrange(inst_details,true_inst,corr_inst,sbs_total_dtrange)[sbs_loc]
sbs_oof_dfs = {}
for inst_id in [true_inst,corr_inst]:
sbs_oof_df = pd.DataFrame()
for sbs_dtr in sbs_dt_ranges:
new_oof_df = oof_dfs[inst_id].loc[(oof_dfs[inst_id].index >= sbs_dtr.start_dt) & (oof_dfs[inst_id].index <= sbs_dtr.end_dt)]
sbs_oof_df = pd.concat([sbs_oof_df,new_oof_df])
sbs_oof_dfs[inst_id] = sbs_oof_df.resample(resample_interval).mean(numeric_only=True)
merged_oof_df = ac_utils.merge_oofdfs(sbs_oof_dfs,dropna=True)
corr_regressions = {}
for corr_gas in corr_gases:
#corr_regressions[corr_gas] = ac.lin_regress_2(merged_oof_df,f'{corr_gas}_{corr_inst}',f'{corr_gas}_{true_inst}')
corr_regressions[corr_gas] = regression_utils.ols_regression(merged_oof_df,f'{corr_gas}_{corr_inst}',f'{corr_gas}_{true_inst}')
if plot:
fig = plot_side_by_side(merged_oof_df, corr_gases, true_inst, corr_inst, corr_regressions, timezone, 'No Correction',summary_keys=['slope','r_squared'])
corr_merged_oof_df = merged_oof_df.copy()
for corr_gas in corr_gases:
corr_merged_oof_df[f'{corr_gas}_{corr_inst}'] = corr_merged_oof_df.apply(
lambda row: row[f'{corr_gas}_{corr_inst}']*corr_regressions[corr_gas]['slope'] + corr_regressions[corr_gas]['intercept']
,axis = 1)
post_corr_regressions = {}
for corr_gas in corr_gases:
post_corr_regressions[corr_gas] = regression_utils.ols_regression(corr_merged_oof_df,f'{corr_gas}_{corr_inst}',f'{corr_gas}_{true_inst}')
#ac.lin_regress_2(corr_merged_oof_df,f'{corr_gas}_{true_inst}',f'{corr_gas}_{corr_inst}')
if plot:
fig = plot_side_by_side(corr_merged_oof_df, corr_gases, true_inst, corr_inst, post_corr_regressions, timezone, 'With Correction',summary_keys=['slope','r_squared'])
sbs_details.append({'sbs_total_dtrange':sbs_total_dtrange,
'corr_regressions':corr_regressions})
Now apply the corrections to the data based on the applicable datetime ranges and the regression coefficients
#Apply the corrections to the entire dataset
corr_oof_dfs = {}
corr_oof_dfs[true_inst] = oof_dfs[true_inst].copy()
oof_df = oof_dfs[corr_inst].copy()
corr_oof_df = pd.DataFrame()
for sbs_detail in sbs_details:
dtr = sbs_detail['sbs_total_dtrange']
corr_regressions = sbs_detail['corr_regressions']
sub_oof_df = oof_df.loc[(oof_df.index >= dtr.start_dt) & (oof_df.index <= dtr.end_dt)]
for corr_gas in corr_gases:
sub_oof_df[corr_gas] = sub_oof_df[corr_gas] * corr_regressions[corr_gas]['slope'] + corr_regressions[corr_gas]['intercept']
corr_oof_df = pd.concat([corr_oof_df,sub_oof_df])
corr_oof_dfs[corr_inst] = corr_oof_df
#Clip to the correct date ranges
for inst_id, inst_det_list in inst_details.items():
for inst_det in inst_det_list:
site = inst_det['site']
dtr = inst_det['dtr']
corr_oof_dfs[inst_id].loc[(corr_oof_dfs[inst_id].index >= dtr.start_dt) & (corr_oof_dfs[inst_id].index <= dtr.end_dt), 'site'] = site
Use the cell below to do a check plot with plotly to make sure the corrections are working as expected
#Plotly plot to check that everything lines up where it should
plot_corr_oof_dfs = {}
for inst_id,corr_oof_df in corr_oof_dfs.items():
plot_corr_oof_dfs[inst_id] = corr_oof_dfs[inst_id].copy().resample(resample_interval).mean(numeric_only=True).dropna()
fig = make_subplots(rows=3, cols=1, subplot_titles=corr_gases,shared_xaxes=True)
for i,corr_gas in enumerate(corr_gases):
for inst_id,plot_corr_oof_df in plot_corr_oof_dfs.items():
if inst_id == true_inst:
color = 'tomato'
else:
color = 'deepskyblue'
fig.add_trace(go.Scatter(
x=plot_corr_oof_df.index,
y=plot_corr_oof_df[corr_gas],
mode='markers',
marker_color=color,
name=f'{inst_id}_{corr_gas}'),
row=i+1, col=1)
fig.show()
Write to Parquet File¶
# Concat the dfs:
corr_oof_df_concat = df_utils.concat_dict_of_dfs(corr_oof_dfs,drop_index=False)
# Write to Parquet File
corr_oof_df_concat.to_parquet(os.path.join(project_data_path,'corr_oof_df_concat.parquet'),index = False)
Averaging Kernels¶
# This pulls out the surface averaging kernels from the netcdf files created by the retrieval.
# This can take a while because it has to open every netcdf file and I never streamlined the code
xgas_names = ['xco','xch4','xco2']
surface_ak_dfs = {}
for inst_id in inst_details.keys():
print(inst_id)
surface_ak_df = pd.DataFrame()
daily_path = os.path.join(em27_data_path,inst_id,'results','daily')
for datestr in sorted(os.listdir(daily_path)):
date_path = os.path.join(daily_path,datestr)
print(datestr)
try:
nc_fname = [file for file in os.listdir(date_path) if file.endswith('.nc')][0]
except:
continue
ds = xr.open_dataset(os.path.join(date_path,nc_fname))
surface_ak_ds = xr.Dataset()
for xgas_name in xgas_names:
surface_ak_interp = ac_utils.get_surface_ak(ds,xgas_name)
surface_ak_ds[f'{xgas_name}_surf_ak'] = surface_ak_interp
df = surface_ak_ds.to_dataframe()
surface_ak_df = pd.concat([surface_ak_df,df])
surface_ak_df = surface_ak_df.rename_axis('dt')
surface_ak_df.index = surface_ak_df.index.tz_localize('UTC').tz_convert(timezone)
surface_ak_dfs[inst_id] = surface_ak_df
# Write to parquet file
surface_ak_df_concat = df_utils.concat_dict_of_dfs(surface_ak_dfs,drop_index=False)
surface_ak_df_concat.to_parquet(os.path.join(project_data_path,'surface_ak_df_concat.parquet'),index = False)
Met Data¶
# Load meteorological data directly from the met files in the EM27 data path.
met_type = 'ggg'
mh = met_utils.MetHandler()
met_dfs = {}
for inst_id, details in inst_details.items():
for detail in details:
site = detail['site']
if (inst_id == 'ua') & (site == 'WBB') | (site == 'DBK'):
continue
print(f'Loading data for {inst_id} at {site}')
dtr = detail['dtr']
met_data_path = os.path.join(em27_data_path, inst_id, 'inst_data/met', site)
#Load the data for the given datetime range
met_df = mh.load_stddata_in_range(met_type, met_data_path, dtr)
met_df[['u','v']] = met_df.apply(lambda row: met_utils.wswd_to_uv(row['ws'],row['wd']),axis = 1, result_type = 'expand')
#Concatenate the dataframes for each site
if site not in met_dfs:
met_dfs[site] = met_df
else:
met_dfs[site] = pd.concat([met_dfs[site], met_df])
# Write met data to parquet files
met_dfs_concat = df_utils.concat_dict_of_dfs(met_dfs,drop_index=False)
met_dfs_concat.to_parquet(os.path.join(project_data_path,'met_dfs_concat.parquet'),index = False)
HRRR Smoke¶
Prior to running this notebook, the "hrrr_smoke.py" main script should be run to download and format the HRRR smoke data. This script will download the data for the specified date range and save it to a file named hrrr_smoke.parquet in the data folder. It can be run using the code in slurm/jobs/hrrr_smoke.slurm.
smoke_df_fname = 'hrrr_smoke.parquet'
smoke_df = pd.read_parquet(os.path.join(project_data_path, smoke_df_fname))
# Calculate daily mean and max
smoke_day_df = smoke_df.groupby(smoke_df.index.date).agg({
'tc_mdens': ['mean', 'max'],
'mdens': ['mean', 'max'],
})
# Flatten the MultiIndex columns
smoke_day_df.columns = ['tc_mdens_mean', 'tc_mdens_max', 'mdens_mean', 'mdens_max']
# Calculate statistics for mean values
mean_tc_mdens = smoke_day_df['tc_mdens_mean'].mean()
median_tc_mdens = smoke_day_df['tc_mdens_mean'].median()
std_tc_mdens = smoke_day_df['tc_mdens_mean'].std()
max_tc_mdens = smoke_day_df['tc_mdens_mean'].max()
# Filter outliers based on mean values
outliers_mean = smoke_day_df[smoke_day_df['tc_mdens_mean'] > (median_tc_mdens + std_tc_mdens)]
outliers_max = smoke_day_df[smoke_day_df['tc_mdens_max'] > (median_tc_mdens + std_tc_mdens)]
outlier_dates_mean = outliers_mean.index
outlier_dates_max = outliers_max.index
outliers = {
"mean": outlier_dates_mean.astype(str).tolist(),
"max": outlier_dates_max.astype(str).tolist(),
}
# Save to json file
with open(os.path.join(project_data_path,'smoke_outliers.json'), 'w') as handle:
json.dump(outliers, handle, indent=4, sort_keys=True, default=str)