def emis_union(species, month, year,
outer_dir, inner_dir, save_dir,
outer_source='CEDS',
inner_source='MEIC',
agg_dir=None,
nmvoc_speciation=False,
mapper_path=None,
country_shp=None,
province_shp=None,
output_res=0.25,
sectors: Union[List[Literal["energy","residential","industry",
"agriculture","transportation","waste","shipping","aviation"]],
str] = "all",
region=None,
global_domain=False,
lon_min=None, lon_max=None,
lat_min=None, lat_max=None):
"""
Integrate emissions from outer (global background) and inner
(regional) inventories onto a unified output grid.
Parameters
----------
species : str
Species name. Case-insensitive. Auto-mapped via mapper.
e.g. 'SO2', 'NOx', 'CO', 'BC', 'PM2.5'
month : int
Month as integer 1-12. Auto-converts to all required formats.
year : str or int
Target year (e.g. 2017).
outer_dir : str
Directory for outer (global background) inventory files.
inner_dir : str
Directory for inner (regional) inventory files.
save_dir : str
Directory to save output NetCDF files.
outer_source : str, optional
Outer inventory type. Options: 'CEDS', 'EDGAR', 'HTAP', 'user'.
Default: 'CEDS'.
- 'CEDS' : CEDS v_2021_04_21 format
- 'EDGAR' : EDGAR v8.1 format
- 'HTAP' : HTAP v3 format
- 'user' : user-provided data (auto-checked and standardized)
inner_source : str, optional
Inner inventory type. Options: 'MEIC', 'user', 'EDGAR', 'HTAP'.
Default: 'MEIC'.
agg_dir : str, optional
Directory for aggregated sector files (HTAP for waste/shipping/aviation).
If None, these sectors are set to zero with a warning.
mapper_path : str, optional
Path to Integrated_mapper.csv.
Default: bundled cinei/data/Integrated_mapper.csv.
country_shp : str, optional
Path to country shapefile.
Default: bundled cinei/data/country.shp.
province_shp : str, optional
Path to province shapefile.
Default: bundled cinei/data/分省.shp.
output_res : float, optional
Output resolution in degrees. Options: 0.05, 0.1, 0.25, 0.5.
Default: 0.25.
region : str, optional
Region name for automatic bbox lookup.
e.g. 'China', 'Beijing', 'NCP', 'Germany', 'India'.
Call cinei.list_regions() to see all presets.
global_domain : bool, optional
If True, use global extent. Default False.
lon_min, lon_max, lat_min, lat_max : float, optional
Manual bounding box. Used when region is None and
global_domain is False.
Default: China domain (70-150E, 10-60N).
Returns
-------
str
Path to the output integrated NetCDF file.
Examples
--------
>>> import cinei
>>> # Minimal — China, CEDS outer, MEIC inner, 0.25°
>>> cinei.emis_union(
... species='SO2', month=1, year=2017,
... outer_dir='/data/CEDS',
... inner_dir='/data/MEIC/2017',
... save_dir='/data/output',
... agg_dir='/data/HTAP',
... )
>>> # EDGAR as outer inventory
>>> cinei.emis_union(
... species='NOx', month=7, year=2017,
... outer_source='EDGAR',
... outer_dir='/data/EDGAR',
... inner_dir='/data/MEIC/2017',
... save_dir='/data/output',
... )
>>> # User-provided outer data (auto-checked/standardized)
>>> cinei.emis_union(
... species='SO2', month=1, year=2017,
... outer_source='user',
... outer_dir='/data/my_inventory',
... inner_dir='/data/MEIC/2017',
... save_dir='/data/output',
... )
>>> # Region by name
>>> cinei.emis_union(
... species='SO2', month=1, year=2017,
... outer_dir='/data/CEDS',
... inner_dir='/data/MEIC/2017',
... save_dir='/data/output',
... region='Beijing',
... )
>>> # Custom resolution
>>> cinei.emis_union(
... species='CO', month=3, year=2017,
... outer_dir='/data/CEDS',
... inner_dir='/data/MEIC/2017',
... save_dir='/data/output',
... output_res=0.1,
... region='NCP',
... )
"""
year = str(year)
# ── Auto-resolve bundled data ─────────────────────────────────────
if mapper_path is None: mapper_path = get_mapper_path()
if country_shp is None: country_shp = get_country_shp()
if province_shp is None: province_shp = get_province_shp()
# ── Validate sources ──────────────────────────────────────────────
if outer_source not in SUPPORTED_OUTER:
raise ValueError(
f"[CINEI] Invalid outer_source: '{outer_source}'\n"
f" Available: {SUPPORTED_OUTER}"
)
if inner_source not in SUPPORTED_INNER:
raise ValueError(
f"[CINEI] Invalid inner_source: '{inner_source}'\n"
f" Available: {SUPPORTED_INNER}"
)
# ── Validate resolution ───────────────────────────────────────────
if output_res not in SUPPORTED_RESOLUTIONS:
raise ValueError(
f"[CINEI] Invalid output_res: {output_res}\n"
f" Available: {SUPPORTED_RESOLUTIONS}"
)
# ── Auto-convert month (accepts int, '01', 'Jan', 'January') ────────
month = _parse_month(month)
mon_name = _MONTH_NAME[month] # e.g. 'Jan'
mon_id = month - 1 # 0-based xarray index
mon_str = _MONTH_STR[month] # e.g. '01'
print(f"[CINEI] Month : {mon_str} ({mon_name})")
# ── Validate and resolve sectors ─────────────────────────────────
if sectors == "all":
active_sectors = ALL_SECTORS.copy()
else:
invalid = [s for s in sectors if s not in ALL_SECTORS]
if invalid:
raise ValueError(
f"[CINEI] Invalid sectors: {invalid}\n"
f" Available: {ALL_SECTORS}"
)
active_sectors = list(sectors)
print(f"[CINEI] Sectors : {active_sectors}")
# ── Resolve region of interest ────────────────────────────────────
_lon_min, _lon_max, _lat_min, _lat_max, region_name = get_region_bbox(
region = region,
country_shp = country_shp,
lon_min = lon_min,
lon_max = lon_max,
lat_min = lat_min,
lat_max = lat_max,
global_domain= global_domain,
)
print(f"[CINEI] ── emis_union ──────────────────────────────")
print(f"[CINEI] Species : {species}")
print(f"[CINEI] Month : {month:02d} ({mon_name})")
print(f"[CINEI] Year : {year}")
print(f"[CINEI] Outer : {outer_source}")
print(f"[CINEI] Inner : {inner_source}")
print(f"[CINEI] Resolution : {output_res}°")
print()
# ── Validate paths ────────────────────────────────────────────────
for path, name in [
(outer_dir, 'outer_dir'),
(inner_dir, 'inner_dir'),
(mapper_path, 'mapper_path'),
(country_shp, 'country_shp'),
(province_shp, 'province_shp'),
]:
_check_path(path, name)
if agg_dir:
_check_path(agg_dir, 'agg_dir')
os.makedirs(save_dir, exist_ok=True)
# ── Read mapper ───────────────────────────────────────────────────
mapper = pd.read_csv(mapper_path)
mapper = mapper.set_index('MEIC')
sp_upper = species.upper()
meic_keys = [k for k in mapper.index if k.upper() == sp_upper]
if not meic_keys:
raise ValueError(
f"[CINEI] Species '{species}' not found in mapper.\n"
f" Available: {list(mapper.index)}"
)
meic_spec = meic_keys[0]
ceds_spec = mapper.loc[meic_spec, 'CEDS']
par = mapper.loc[meic_spec, 'partition']
M = mapper.loc[meic_spec, 'weight']
V = mapper.loc[meic_spec, 'if VOC']
# ── Build output grid ─────────────────────────────────────────────
half = output_res / 2
lon_arange = np.arange(_lon_min + half, _lon_max, output_res,
dtype=np.float32)
lat_arange = np.arange(_lat_min + half, _lat_max, output_res,
dtype=np.float32)
lon_2d, lat_2d = np.meshgrid(lon_arange, lat_arange)
area = ll_area(lat_2d, output_res)
n_lat, n_lon = len(lat_arange), len(lon_arange)
# ── Read outer (global background) inventory ──────────────────────
outer_bg = _read_outer(
outer_source = outer_source,
outer_dir = outer_dir,
ceds_spec = ceds_spec,
meic_spec = meic_spec,
year = year,
mon_id = mon_id,
mon_str = mon_str,
lon_arange = lon_arange,
lat_arange = lat_arange,
output_res = output_res,
region_name = region_name,
_lon_min=_lon_min, _lon_max=_lon_max,
_lat_min=_lat_min, _lat_max=_lat_max,
)
# ── Unit conversion ───────────────────────────────────────────────
if V == 'Y':
unit_outer = outer_bg * 0.001 * area * 2678400 * 1000000 * par / M
else:
unit_outer = outer_bg * 0.001 * area * 2678400 * 1000000 * par
# ── Clip outer to outside region (China excl. Taiwan) ────────────
country = gpd.read_file(country_shp)
China_shp = country[country['CNTRY_NAME'] == 'China']
province = gpd.read_file(province_shp)
Taiwan_shp = province[province['行政区划_c'] == '台湾省']
mChina = gpd.overlay(China_shp, Taiwan_shp, how='difference')
unit_outer.rio.write_crs("epsg:4326", inplace=True)
outer_clipped = unit_outer.rio.clip(
mChina.geometry, mChina.crs, drop=False, invert=True)
# ── Aggregated sectors (waste, shipping, aviation) ────────────────
if agg_dir is not None:
print(f"[CINEI] Regridding aggregated sectors...")
ds_agg = regrid_aggregation(
mon_id=mon_id, meic_spec=meic_spec, year=year,
ceds_dir=outer_dir, htap_dir=agg_dir,
mapper_path=mapper_path, output_res=output_res,
lon_min=_lon_min, lon_max=_lon_max,
lat_min=_lat_min, lat_max=_lat_max,
)
allwst = ds_agg['waste'].values
alldoshp = ds_agg['shipping'].values
all_avi = ds_agg['aviation'].values
doagr = ds_agg['agriculture'].values
# Agriculture: use HTAP full domain — no clipping
# HTAP agriculture covers both inner and outer domain consistently
dms_agr = doagr
else:
print(f"[CINEI] ⚠️ agg_dir not provided → "
f"waste/shipping/aviation set to zero.")
zeros = np.zeros((n_lat, n_lon), dtype='float32')
allwst = zeros; alldoshp = zeros
all_avi = zeros; dms_agr = zeros
# ── Read inner (regional) inventory ──────────────────────────────
act, idt, pwr, rdt, tpt = _read_inner(
inner_source = inner_source,
inner_dir = inner_dir,
meic_spec = meic_spec,
mon_name = mon_name,
mon_str = mon_str,
month = month,
year = year,
n_lat = n_lat,
n_lon = n_lon,
lon_arange = lon_arange,
lat_arange = lat_arange,
mon_id = mon_id,
)
# ── Merge sectors (only active) ──────────────────────────────────────
pwr_union = (np.nan_to_num(outer_clipped['energy'], nan=0) + pwr
if 'energy' in active_sectors
else np.zeros((n_lat, n_lon), dtype='float32'))
res_union = (np.nan_to_num(outer_clipped['residential'], nan=0) +
np.nan_to_num(outer_clipped['solvents'], nan=0) + rdt
if 'residential' in active_sectors
else np.zeros((n_lat, n_lon), dtype='float32'))
idt_union = (np.nan_to_num(outer_clipped['industrial'], nan=0) + idt
if 'industry' in active_sectors
else np.zeros((n_lat, n_lon), dtype='float32'))
shp_union = (np.nan_to_num(outer_clipped['ships'], nan=0) + alldoshp
if 'shipping' in active_sectors
else np.zeros((n_lat, n_lon), dtype='float32'))
tpt_union = (np.nan_to_num(outer_clipped['transportation'], nan=0) + tpt
if 'transportation' in active_sectors
else np.zeros((n_lat, n_lon), dtype='float32'))
# Agriculture: use HTAP for entire domain (no clipping)
# MEIC agriculture contains NaN inside China → unreliable
# HTAP provides consistent coverage for both inner and outer domain
act_union = (doagr
if 'agriculture' in active_sectors
else np.zeros((n_lat, n_lon), dtype='float32'))
swd_union = allwst if 'waste' in active_sectors else np.zeros((n_lat, n_lon), dtype='float32')
avi_union = all_avi if 'aviation' in active_sectors else np.zeros((n_lat, n_lon), dtype='float32')
sum_union = (pwr_union + res_union + idt_union + shp_union +
swd_union + tpt_union + act_union)
# ── Build output Dataset ──────────────────────────────────────────
myds = xr.Dataset(
{"energy": (("lat", "lon"), pwr_union),
"residential": (("lat", "lon"), res_union),
"industry": (("lat", "lon"), idt_union),
"agriculture": (("lat", "lon"), act_union),
"transportation": (("lat", "lon"), tpt_union),
"waste": (("lat", "lon"), swd_union),
"shipping": (("lat", "lon"), shp_union),
"aviation": (("lat", "lon"), avi_union),
"sum": (("lat", "lon"), sum_union)},
coords={'lon': lon_arange, 'lat': lat_arange})
myds.attrs['unit'] = 'million mole/month/grid' if V=='Y' else 'ton/month/grid'
myds.attrs['resolution'] = f'{output_res}° x {output_res}°'
myds.attrs['region'] = region_name
myds.attrs['outer_source']= outer_source
myds.attrs['inner_source']= inner_source
myds.attrs['conventions'] = 'NETCDF3_CLASSIC'
myds.attrs['authors'] = 'Yijuan Zhang, University of Bremen.'
myds.attrs['title'] = (
f'CINEI integrated emissions ({region_name}) '
f'{meic_spec} {year}-{mon_str}')
# ── Write output ──────────────────────────────────────────────────
output_spec = mapper.loc[meic_spec, 'output species']
res_str = str(output_res).replace('.', 'p')
output = os.path.join(
save_dir,
f'CINEI_{year}_{mon_name}_{output_spec}_'
f'{res_str}deg_{region_name}.nc')
myds.to_netcdf(output, format="NETCDF3_CLASSIC")
print(f"[CINEI] ✅ Saved: {output}")
# ── Auto-run VOC speciation if requested ──────────────────────────
if nmvoc_speciation and meic_spec.upper() == 'NMVOC':
print(f"\n[CINEI] Running NMVOC speciation...")
_nmvoc_speciation(
nmvoc_nc_path=output,
save_dir=save_dir,
sectors=active_sectors,
)
return output