diff --git a/config/forecasters-ich1_mec_ffv2.yaml b/config/forecasters-ich1_mec_ffv2.yaml new file mode 100644 index 00000000..f58190e3 --- /dev/null +++ b/config/forecasters-ich1_mec_ffv2.yaml @@ -0,0 +1,93 @@ +# yaml-language-server: $schema=../workflow/tools/config.schema.json +description: | + Evaluate skill of Stage E with/without cutoff edges trained with and without subgrid orography. + +dates: + start: 2025-07-26T00:00 + end: 2025-07-27T00:00 + frequency: 6h + +runs: + - forecaster: + inference_resources: + slurm_partition: normal-shared + checkpoint: https://service.meteoswiss.ch/mlstore#/experiments/602/runs/c30490b6ba064e4db03b430f3a2595ad + label: stage_E_icon_1km_cutoff_edges_subgrid_horography + steps: 0/24/6 + config: resources/inference/configs/sgm-multidataset-forecaster-global-ich1-oper.yaml + extra_requirements: + - git+https://github.com/ecmwf/anemoi-inference.git@e369b1a90313e9701db13f63364a467aa281cf36 + +baselines: + - baseline: + baseline_id: ICON-CH1-EPS + label: ICON-CH1-ctrl + root: /store_new/mch/msopr/ml/ICON-CH1-EPS + steps: 0/24/6 + + - baseline: + baseline_id: ICON-CH2-EPS + label: ICON-CH2-ctrl + root: /store_new/mch/msopr/ml/ICON-CH2-EPS + steps: 0/24/6 + + +truth: + label: KENDA-CH1 + root: /store_new/mch/msopr/ml/datasets/mch-ich1-1km-2024-2025-1h-pl13-v1.0.zarr + +experiment: + params: + - T_2M + - TD_2M + - U_10M + - V_10M + - TOT_PREC + stratification: + regions: + - jura + root: /scratch/mch/bhendj/regions/Prognoseregionen_LV95_20220517 + thresholds: + TOT_PREC: + gt: [0.0, 1, 5] + U_10M: + gt: [2.5, 5.0, 10.0] + V_10M: + gt: [2.5, 5.0, 10.0] + T_2M: + lt: [273.15] + gt: [288.15, 298.15] + dashboard: + stratification: + # - init_hour + # - region + - season + +locations: + output_root: output/ + +mec: + ekf_root: /store_new/mch/msopr/osm/KENDA-CH1/EKF + mon_synop_root: /scratch/mch/paa/mec/MEC_ML_input/monFiles2025 + ver_synop_root: /scratch/mch/paa/mec/MEC25_I-CH1 + +ffv2: + experiment_ids: Varda + experiment_description: Varda-Single_suboro + file_description: exp_Varda-Single_2025 + domain_table: /users/paa/01_store/02_FFV2/data/8_CH_box + blacklists: /users/paa/01_store/02_FFV2/data/blacklist + +profile: + executor: slurm + global_resources: + gpus: 16 + default_resources: + slurm_partition: "postproc" + cpus_per_task: 1 + mem_mb_per_cpu: 1800 + runtime: "1h" + gpus: 0 + jobs: 50 + batch_rules: + plot_forecast_frame: 32 diff --git a/resources/ffv2/template_SYNOP_DET.nl.jinja2 b/resources/ffv2/template_SYNOP_DET.nl.jinja2 new file mode 100644 index 00000000..4730d51e --- /dev/null +++ b/resources/ffv2/template_SYNOP_DET.nl.jinja2 @@ -0,0 +1,84 @@ + +# variables to verify. +varnoContinuous 'T2M,TD2M,RH2M,U10M,V10M,PS,FF,DD,GUST_6h,RR_6h' #,RR_6h,N,N_L,N_M,N_H,RAD_GL_1h +pecthresholds list('FF'=list('lower'=c(2),'upper'=c(7))) # hit rates (percent correct forecast) for the forecast to hit the observation within the given limits are calculated. +catthresholds list('T2M'=c(282,292),'FF'=c(2.5,5,10)) # no space allowed between variables + +# senseful and short description of the model versions to verify. +# example: 'D-metno,C-metno_rollout,C-cerra_rollout' +expIds '{{ experiment_ids }}' + + + +# location of the feedback files. All feedback files per model must be stored +# in one directory and the order must correspond to expIds +# Example: '/src/ffv2/input/,' +# fdbkDirs '{{ feedback_directories }}' +# Use a path in the container, to avoid mixups between absolute/relative paths. +# TODO: Change this once we support having more than one directory. +fdbkDirs '/src/ffv2/input' + +# should be however many models +# example: '-1,-1,-1' for 3 models +veri_ens_member '{{ veri_ens_member }}' + + +# the (existing!) output directory (absolute or relative to working directory) +#outDir '{{ output_directory }}' +# Use a path in the container, to avoid mixups between absolute/relative paths. +outDir '/src/ffv2/output' + +# string used to build the filenames of the intermediate scorefiles (one valid date). Only restriction: +# no other file in the output directory contains that string in the filename +# example: 'emulator_onPL_ALL_obs_2020' +expDescr '{{ experiment_description }}' + +# string used to build the filenames of the final scorefiles. +# please follow the naming convention explained on the wiki page +# This should match the output file for this from snakemake rule +# Example: 'exp_ACOSMO-2-models_C-2E-CTRL_2020' +fileDescr '{{ file_description }}' + + +# string to filter feedback file input. All feedback files that contain the string in their filename are used. +# regular expressions are allowed, for details see list.files() in R +filePattern 'verSYNOP' + +# switch experiment/routine verification. Set to T. (F only for DWD routine verification) +experiment 'T' + +# switch for testing if two model versions differ significantly (T/F). Set to T only if there are two or more model versions to verify +sigTest 'T' + +# subdomains to verify +subdomains 'ALL,USER' +# Location of domain table and blacklists. This data should be mounted from balfrin into +# the container. Recommend using the same path in src/dest, for simplicity. +# Example: '/users/paa/01_store/02_FFV2/data/7_ML_inner_polygon' +domainTable '{{ domain_table }}' +# Example: '/users/paa/01_store/02_FFV2/data/blacklist' +blacklists '{{ blacklists }}' + +# time binning steps for valid times (in minutes) relative to the valid time of the feedback file (timestamp in filename) read +# (time granularity of the verification) +# does not apply to data of more than one FF (because FF are read sequently) +timeSteps '0' + +# range of the temporal bins in minutes +timeBreaks '30,-30' + +# set forecast times for which scores should be calculated. +# useful filter for lead times with very low LEN. +#veri_forecast_time '0000,1200,2400,3600,4800,6000,7200,8400,9600,10800,12000' + +# run class(es). See Table 27 in the Feedback File Definition (version of 29 Oct. 2019) +#veri_run_class '0,2' + +# run type(s). See Table 26 in the Feedback File Definition (version of 29 Oct. 2019) +veri_run_type '0,4' + +# state(s) of observation to be used. See Table 8 in the Feedback File Definition (version of 29 Oct. 2019) +#state '0,1,5,7,13' + +# state(s) of report to be used. See Table 8 in the Feedback File Definition (version of 29 Oct. 2019) +#r_state '0,1,5,7,13' diff --git a/resources/inference/configs/sgm-multidataset-forecaster-global-ich1-oper.yaml b/resources/inference/configs/sgm-multidataset-forecaster-global-ich1-oper.yaml index 12213293..41a85475 100644 --- a/resources/inference/configs/sgm-multidataset-forecaster-global-ich1-oper.yaml +++ b/resources/inference/configs/sgm-multidataset-forecaster-global-ich1-oper.yaml @@ -33,6 +33,9 @@ output: - extract_mask: # removes global points mask: "lam_0/cutout_mask" as_slice: true + - accumulate_from_start_of_forecast: # accumulate tp from start of forecast + accumulations: + - tp - grib: path: grib/ifs-{date}{time:04}_{step:03}.grib encoding: @@ -41,11 +44,14 @@ output: templates: samples: resources/templates_index_ifs.yaml post_processors: - - extract_mask: # removes lam points - mask: "lam_0/cutout_mask" - as_slice: true - inverse: true - - assign_mask: # fill local/global overlapping points with nan - mask: "global/cutout_mask" + - extract_mask: # removes lam points + mask: "lam_0/cutout_mask" + as_slice: true + inverse: true + - assign_mask: # fill local/global overlapping points with nan + mask: "global/cutout_mask" + - accumulate_from_start_of_forecast: # accumulate tp from start of forecast + accumulations: + - tp patch_metadata: resources/sgm-multidataset-ich1-oper-patch.yaml diff --git a/resources/mec/namelist.jinja2 b/resources/mec/namelist.jinja2 new file mode 100644 index 00000000..6abc9062 --- /dev/null +++ b/resources/mec/namelist.jinja2 @@ -0,0 +1,77 @@ +!============================================================================== +! namelist template for MEC +!============================================================================== + + !=================== + ! general parameters + !=================== + &run + method = 'GMESTAT' ! Model Equivalent Calculator + model = 'ML' ! forecast model. One of "COSMO" "ICON" "ML" + input = './input_mod' ! input data path + data = '/oprusers/osm/opr.inn/data/' ! data path for auxiliary data + obsinput = './input_obs' ! observation input data path + output = '.' ! output data to working directory + time_ana = {{ init_time }}00 ! analysis date YYYYMMDDHHMMSS + read_fields = 'ps u t v q geof t2m td2m u_10m v_10m' + grib_edition = 2 + grib_library = 2 ! GRIB-API used: 1=GRIBEX 2=GRIB2-API + cosmo_refatm = 2 ! reference atmosphere to be used for COSMO:1or2 + fc_hours = 0 ! Default is 3h. Has to be set to 0 if one wants to verify +0h leadtime + grid_file = '/oprusers/osm/opr.inn/data/grid_descriptions/icon_grid_0001_R19B08_mch.nc' ! grid description file for ICON (needed for REA-L) + nproc1 = 1 + nproc2 = 1 + / + + !=============================== + ! observation related parameters + !=============================== + &observations + !--------------------------------------------------- + ! read from CDFIN files (if not set use mon/cof/ekf) + !--------------------------------------------------- + read_cdfin = F ! (F): dont read COSMO CDFIN files get obs from ekf + vint_lin_t = T ! linear vertical interpolation for temperature + vint_lin_z = T ! linear vertical interpolation for geopotential + vint_lin_uv = T ! linear vertical interpolation for wind + ptop_lapse = 850. + pbot_lapse = 950. +! int_nn = T ! horizontal interpolation: nearest neighbor + / + + !==================== + ! Ensemble parameters + !==================== + &ENKF + k_enkf = 0 ! ensemble size (0 for det. run) + det_run = 1 ! set to 1 for deterministic run, 0 for ensemble + / + + !================================ + ! Verification related parameters + !================================ + &veri_obs + obstypes = "SYNOP" ! "SYNOP TEMP" + fc_times = {{ leadtimes }} ! forecast lead time at reference (hhmm) 0000,1200,2400,... + prefix_in = 'obs' ! prefix for input files. ekf or mon (or obs if a verSYNOP.nc file is used) + prefix_out = 'ver' + rm_old = 2 ! overwrite entries in verification file ? + fc_file = '_FCR_TIMEMM_.grib' ! template for forecast file name + time_range = 1 + ekf_concat = F + ref_runtype = 'any' ! accept any runtype for the reference state + / + + &report + time_b = -0029 ! (hhmm, inclusive) + time_e = 0030 ! (hhmm, exclusive) + / + + &cosmo_obs + lcd187 = .true. ! use ground based wind lidar obs + verification_start = -29 ! (min, inclusive) + verification_end = 30 ! (min, inclusive) + / + &synop_obs + version = 1 + / diff --git a/src/evalml/cli.py b/src/evalml/cli.py index 51a9ed45..a5d86153 100644 --- a/src/evalml/cli.py +++ b/src/evalml/cli.py @@ -151,7 +151,7 @@ def execute_workflow( if not verbose: command += ["--quiet", "rules"] # reduce verobosity of snakemake output - raise SystemExit(run_command(command)) + return run_command(command) @click.group(help="Evaluation workflows for ML experiments.") @@ -163,22 +163,54 @@ def cli(): @click.argument( "configfile", type=click.Path(exists=True, dir_okay=False, path_type=Path) ) +@click.option( + "--mec", + is_flag=True, + default=False, + help="Run MEC for Feedback file production.", +) +@click.option( + "--ffv2", is_flag=True, default=False, help="Run the FFV2 scorefile production." +) @workflow_options def experiment( - configfile, cores, verbose, dry_run, unlock, report, dag, rulegraph, extra_smk_args + configfile, + mec, + ffv2, + cores, + verbose, + dry_run, + unlock, + report, + dag, + rulegraph, + extra_smk_args, ): - execute_workflow( - configfile, - "experiment_all", - cores, - verbose, - dry_run, - unlock, - report, - dag, - rulegraph, - extra_smk_args, - ) + targets = [] + if mec or ffv2: + if mec: + targets.append("mec_all") + if ffv2: + targets.append("ffv2_all") + else: + targets.append("experiment_all") + + for target in targets: + rc = execute_workflow( + configfile, + target, + cores, + verbose, + dry_run, + unlock, + report, + dag, + rulegraph, + extra_smk_args, + ) + if rc: + raise SystemExit(rc) + raise SystemExit(0) @cli.command(help="Obtain showcase material as defined by a config YAML file.") @@ -189,17 +221,19 @@ def experiment( def showcase( configfile, cores, verbose, dry_run, unlock, report, dag, rulegraph, extra_smk_args ): - execute_workflow( - configfile, - "showcase_all", - cores, - verbose, - dry_run, - unlock, - report, - dag, - rulegraph, - extra_smk_args, + raise SystemExit( + execute_workflow( + configfile, + "showcase_all", + cores, + verbose, + dry_run, + unlock, + report, + dag, + rulegraph, + extra_smk_args, + ) ) @@ -211,17 +245,19 @@ def showcase( def sandbox( configfile, cores, verbose, dry_run, unlock, report, dag, rulegraph, extra_smk_args ): - execute_workflow( - configfile, - "sandbox_all", - cores, - verbose, - dry_run, - unlock, - report, - dag, - rulegraph, - extra_smk_args, + raise SystemExit( + execute_workflow( + configfile, + "sandbox_all", + cores, + verbose, + dry_run, + unlock, + report, + dag, + rulegraph, + extra_smk_args, + ) ) @@ -243,15 +279,17 @@ def make( rulegraph, extra_smk_args, ): - execute_workflow( - configfile, - target, - cores, - verbose, - dry_run, - unlock, - report, - dag, - rulegraph, - extra_smk_args, + raise SystemExit( + execute_workflow( + configfile, + target, + cores, + verbose, + dry_run, + unlock, + report, + dag, + rulegraph, + extra_smk_args, + ) ) diff --git a/src/evalml/config.py b/src/evalml/config.py index 3e89b239..c28e7361 100644 --- a/src/evalml/config.py +++ b/src/evalml/config.py @@ -410,6 +410,52 @@ def parsable(self) -> Dict[str, str]: return out +class MecConfig(BaseModel): + """Paths to input observation files for the MEC verification step.""" + + ekf_root: str = Field( + ..., + description="Root directory for EKF SYNOP files. Files are expected at {ekf_root}/{YYYYMM}/ekfSYNOP_{init}00.nc.", + ) + mon_synop_root: str = Field( + ..., + description="Root directory for monSYNOP files. Files are expected at {mon_synop_root}/{YYYYMMDDH}/monSYNOP.nc.", + ) + ver_synop_root: str = Field( + ..., + description="Root directory for reference verSYNOP files. Files are expected at {ver_synop_root}/verSYNOP_{init}00.nc.", + ) + + model_config = {"extra": "forbid"} + + +class Ffv2Config(BaseModel): + """Configuration for the FFV2 scoring pipeline.""" + + experiment_ids: str = Field( + ..., + description="Comma-separated experiment IDs passed to FFV2.", + ) + experiment_description: str = Field( + ..., + description="Short description of the experiment for FFV2 output files.", + ) + file_description: str = Field( + ..., + description="File description string used in FFV2 output file naming.", + ) + domain_table: str = Field( + ..., + description="Path to the domain table file (polygon) used by FFV2.", + ) + blacklists: str = Field( + ..., + description="Path to the blacklist directory used by FFV2.", + ) + + model_config = {"extra": "forbid"} + + class ConfigModel(BaseModel): """Top-level configuration.""" @@ -441,6 +487,14 @@ class ConfigModel(BaseModel): default_factory=ShowcaseConfig, description="Settings for the showcase workflow.", ) + mec: MecConfig | None = Field( + None, + description="Input observation paths for the MEC verification step. Required when running with --mec.", + ) + ffv2: Ffv2Config | None = Field( + None, + description="Configuration for the FFV2 scoring pipeline. Required when running with --ffv2.", + ) model_config = { "extra": "forbid", # fail on misspelled keys diff --git a/workflow/Snakefile b/workflow/Snakefile index 2f9f3c36..e4aca6ef 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -19,6 +19,7 @@ include: "rules/inference.smk" include: "rules/verification.smk" include: "rules/report.smk" include: "rules/plot.smk" +include: "rules/verif_obs.smk" # about workflow @@ -43,6 +44,12 @@ RESULTS_DIR = OUT_ROOT / "results" / EXPERIMENT_NAME # form results/{showcase}/{run_id}/... — constrain showcase to a single path component. wildcard_constraints: showcase=r"[^/]+", + run_id=r"[^/]+/[^/]+", + init_time=r"\d{12}", + + +# prefer one rule because snakemake complains about ambiguous rules (same output) +ruleorder: inference_prepare_forecaster > inference_prepare_interpolator # optional messages, log and error handling @@ -131,6 +138,10 @@ rule experiment_all: rules.verification_metrics_plot.output, experiment=EXPERIMENT_NAME, ), + expand( + OUT_ROOT / "data/runs/{run_id}/summary.md", + run_id=collect_all_candidates(), + ), rule showcase_all: @@ -175,10 +186,17 @@ rule inference_all: """Run inference for all reference times as defined in the configuration.""" input: expand( - OUT_ROOT / "data/runs/{run_id}/{init_time}/raw", + rules.inference_execute.output.okfile, init_time=[t.strftime("%Y%m%d%H%M") for t in REFTIMES], run_id=CANDIDATES, ), + output: + run_all_ok=touch(OUT_ROOT / "logs/run_inference_all.ok"), + shell: + """ + mkdir -p $(dirname {output.run_all_ok}) + touch {output.run_all_ok} + """ rule verification_metrics_all: @@ -197,3 +215,22 @@ rule verification_metrics_plot_all: rules.verification_metrics_plot.output, experiment=EXPERIMENT_NAME, ), + + +rule mec_all: + """Target rule for MEC observation verification (produces fdbk_files).""" + input: + expand( + OUT_ROOT / "data/runs/{run_id}/fdbk_files/verSYNOP_{init_time}00.nc", + init_time=[t.strftime("%Y%m%d%H%M") for t in REFTIMES_MEC], + run_id=CANDIDATES, + ), + + +rule ffv2_all: + """Target rule for FFV2 scoring pipeline (consumes fdbk_files, produces shiny scores).""" + input: + expand( + rules.reorganize_ffv2_files.output, + run_id=collect_all_candidates(), + ), diff --git a/workflow/rules/inference.smk b/workflow/rules/inference.smk index 87782c38..e9a30fd2 100644 --- a/workflow/rules/inference.smk +++ b/workflow/rules/inference.smk @@ -195,9 +195,7 @@ rule inference_prepare_forecaster: config=Path(OUT_ROOT / "data/runs/{run_id}/{init_time}/config.yaml"), resources=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/resources"), grib_out_dir=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/grib"), - okfile=touch( - OUT_ROOT / "logs/inference_prepare_forecaster/{run_id}-{init_time}.ok" - ), + okfile=OUT_ROOT / "logs/inference_prepare_forecaster/{run_id}-{init_time}.ok", log: OUT_ROOT / "logs/inference_prepare_forecaster/{run_id}-{init_time}.log", localrule: True @@ -236,9 +234,7 @@ rule inference_prepare_interpolator: resources=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/resources"), grib_out_dir=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/grib"), forecaster=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/forecaster"), - okfile=touch( - OUT_ROOT / "logs/inference_prepare_interpolator/{run_id}-{init_time}.ok" - ), + okfile=OUT_ROOT / "logs/inference_prepare_interpolator/{run_id}-{init_time}.ok", log: OUT_ROOT / "logs/inference_prepare_interpolator/{run_id}-{init_time}.log", localrule: True @@ -280,7 +276,7 @@ rule inference_execute: image=lambda wc: OUT_ROOT / f"data/runs/{RUN_CONFIGS[wc.run_id]['env_id']}/venv.squashfs", output: - okfile=touch(OUT_ROOT / "logs/inference_execute/{run_id}-{init_time}.ok"), + okfile=OUT_ROOT / "logs/inference_execute/{run_id}-{init_time}.ok", log: OUT_ROOT / "logs/inference_execute/{run_id}-{init_time}.log", localrule: True @@ -333,5 +329,6 @@ rule inference_execute: anemoi-inference run config.yaml "${{CMD_ARGS[@]}}" ' ) >{log} 2>&1 + touch {output.okfile} """ # fmt: on diff --git a/workflow/rules/verif_obs.smk b/workflow/rules/verif_obs.smk new file mode 100644 index 00000000..31cd523b --- /dev/null +++ b/workflow/rules/verif_obs.smk @@ -0,0 +1,366 @@ +from pathlib import Path +from datetime import timedelta + + +def _parse_steps(steps: str) -> list[int]: + # check that steps is in the format "start/stop/step" + if "/" not in steps: + raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") + if len(steps.split("/")) != 3: + raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") + start, end, step = map(int, steps.split("/")) + return list(range(start, end + 1, step)) + + +def _reftimes_mec(): + """ + Return the subset of REFTIMES that can serve as MEC init times. + + A time t is eligible only if all source init times required by link_mec_input + (i.e. t - l for every configured lead l) are also present in REFTIMES. + Works for both range-based and explicit-list date configs. + """ + leads = _parse_steps(config["runs"][0]["forecaster"]["steps"]) + reftimes_set = set(REFTIMES) + return [ + t + for t in REFTIMES + if all(t - timedelta(hours=l) in reftimes_set for l in leads) + ] + + +REFTIMES_MEC = _reftimes_mec() + + +rule prepare_mec_input: + """Collect EKF SYNOP, monSYNOP, and reference verSYNOP observation files into the MEC input_obs directory.""" + input: + inference_ok=lambda wc: expand( + rules.inference_execute.output.okfile, + run_id=wc.run_id, + init_time=[t.strftime("%Y%m%d%H%M") for t in REFTIMES], + ), + output: + obs=directory(OUT_ROOT / "data/runs/{run_id}/mec/{init_time}/input_obs"), + ekf_file=OUT_ROOT / "data/runs/{run_id}/mec/{init_time}/input_obs/ekfSYNOP.nc", + obs_file=OUT_ROOT / "data/runs/{run_id}/mec/{init_time}/input_obs/obsSYNOP.nc", + log: + OUT_ROOT / "logs/prepare_mec_input/{run_id}-{init_time}.log", + params: + ekf_root=config["mec"]["ekf_root"], + mon_synop_root=config["mec"]["mon_synop_root"], + ver_synop_root=config["mec"]["ver_synop_root"], + shell: + """ + ( + set -euo pipefail + + mkdir -p {output.obs} + + # extract YYYYMM from init_time (which is YYYYMMDDHHMM) + init="{wildcards.init_time}" + ym="${{init:0:6}}" + echo "init time: ${{init}}" + + # collect observations (ekfSYNOP) and/or (monSYNOP from DWD; includes precip) files + cp {params.ekf_root}/${{ym}}/ekfSYNOP_${{init}}00.nc {output.ekf_file} + cp {params.mon_synop_root}/${{init:0:10}}/monSYNOP.nc {output.obs}/monSYNOP.nc + cp {params.ver_synop_root}/verSYNOP_${{init}}00.nc {output.obs_file} + echo "Copied obs files to {output.obs}" + + ) >{log} 2>&1 + """ + + +rule link_mec_input: + """For each valid time, merge the two inference GRIB files 0h and - 6h and copy it +from the source init time into input_mod/, named by source init time. +This assembles the model input directory that MEC expects for a single valid time. +""" + input: + # depend on ALL source grib dirs: for each lead l, source_init = init_time - l hours + obs_file=rules.prepare_mec_input.output.obs_file, + src_dirs=lambda wc: expand( + str(OUT_ROOT / "data/runs/{run_id}/{src_init}/grib"), + run_id=wc.run_id, + src_init=[ + ( + datetime.strptime(wc.init_time, "%Y%m%d%H%M") - timedelta(hours=l) + ).strftime("%Y%m%d%H%M") + for l in _parse_steps(RUN_CONFIGS[wc.run_id]["steps"]) + ], + ), + output: + # own the final input_mod directory for this init (and its contents) + mod=directory(OUT_ROOT / "data/runs/{run_id}/mec/{init_time}/input_mod"), + log: + OUT_ROOT / "logs/link_mec_input/{run_id}-{init_time}.log", + params: + # generate a space-separated list of lead hours from the run config + leads=lambda wc: " ".join( + str(l) for l in _parse_steps(RUN_CONFIGS[wc.run_id]["steps"]) + ), + shell: + """ + ( + set -euo pipefail + + mkdir -p {output.mod} + cd {output.mod}/../../.. + + init="{wildcards.init_time}" + echo "Creating input_mod files for init $init (leads: {params.leads})" + + # for each configured lead copy (and optionally merge) source files into input_mod + for lead in {params.leads}; do + lead3=$(printf "%03d" "$lead") + # compute source init such that source_init + lead = ref(init) + src_epoch=$(date -u -d "${{init:0:4}}-${{init:4:2}}-${{init:6:2}}T${{init:8:2}}:${{init:10:2}}:00Z" +%s) + source_init=$(date -u -d "@$((src_epoch - lead * 3600))" +"%Y%m%d%H%M") + src_rel="$source_init/grib/${{source_init}}_${{lead3}}.grib" + + if [[ -e "$src_rel" ]]; then + dest="mec/{wildcards.init_time}/input_mod/${{source_init}}.grib" + if [[ "$lead" -eq 0 ]]; then + echo "Copying $src_rel -> $dest" + cp "$src_rel" "$dest" + else + prev_lead3=$(printf "%03d" "$((lead - 6))") + prev_rel="$source_init/grib/${{source_init}}_${{prev_lead3}}.grib" + if [[ -e "$prev_rel" ]]; then + echo "Merging $prev_rel + $src_rel -> $dest" + cat "$prev_rel" "$src_rel" >"$dest" + else + echo "WARNING: previous lead file $prev_rel not found, copying $src_rel only" >&2 + cp "$src_rel" "$dest" + fi + fi + else + echo "WARNING: source file $src_rel not found" >&2 + fi + done + ) >{log} 2>&1 + """ + + +rule generate_mec_namelist: + input: + script="workflow/scripts/generate_mec_namelist.py", + template="resources/mec/namelist.jinja2", + mod_dir=directory(rules.link_mec_input.output.mod), + output: + namelist=OUT_ROOT / "data/runs/{run_id}/mec/{init_time}/namelist", + localrule: True + params: + steps=lambda wc: RUN_CONFIGS[wc.run_id]["steps"], + shell: + """ + uv run {input.script} \ + --steps {params.steps} \ + --init_time {wildcards.init_time} \ + --template {input.template} \ + --namelist {output.namelist} + """ + + +rule sarus_pull_mec: + """Pull the MEC sarus container image once before parallel MEC jobs.""" + output: + touch(OUT_ROOT / "logs/sarus_pull_mec.ok"), + localrule: True + shell: + "sarus pull container-registry.meteoswiss.ch/mecctr/mec-container:0.1.0-main" + + +rule run_mec: + """Run the MEC container for one initialisation time, producing a verSYNOP feedback file in fdbk_files/.""" + input: + namelist=rules.generate_mec_namelist.output.namelist, + prepare_obs=rules.prepare_mec_input.output.obs_file, + mod_dir=directory(rules.link_mec_input.output.mod), + pull_ok=rules.sarus_pull_mec.output, + output: + fdbk_file=OUT_ROOT / "data/runs/{run_id}/fdbk_files/verSYNOP_{init_time}00.nc", + log: + OUT_ROOT / "logs/run_mec/{run_id}-{init_time}.log", + shell: + """ + ( + set -euo pipefail + + run_dir=$(dirname {input.namelist}) + abs_run_dir=$(realpath "$run_dir") + abs_mod_root=$(realpath "$run_dir/../..") # two levels up (so that all links are mounted to the container) + + # run container + sarus run \ + --mount=type=bind,source=$abs_run_dir,destination=/src/bin2 \ + --mount=type=bind,source=$abs_mod_root,destination=$abs_mod_root,readonly \ + --mount=type=bind,source=/oprusers/osm/opr.inn/data/,destination=/oprusers/osm/opr.inn/data/ \ + container-registry.meteoswiss.ch/mecctr/mec-container:0.1.0-main + + # Run MEC using local executable (Alternative to sarus container) + #cd "$run_dir" + #export LM_HOST=balfrin-ln003 + #source /oprusers/osm/opr.inn/abs/mec.env + #./mec > ./mec_out.log 2>&1 + + # copy the output file to the final location for the Feedback files + # and rename to match NWP conventions + mkdir -p "$run_dir/../../fdbk_files" + cp "$run_dir/verSYNOP.nc" "$run_dir/../../fdbk_files/verSYNOP_{wildcards.init_time}00.nc" + echo "...time at end of run_mec: $(date)" + ) >{log} 2>&1 + """ + + +rule generate_ffv2_namelist: + input: + script="workflow/scripts/generate_ffv2_namelist.py", + template="resources/ffv2/template_SYNOP_DET.nl.jinja2", + # Block on MEC running for all input times, since FFV2 is across feedback files. + mec_ok=lambda wc: expand( + rules.run_mec.output.fdbk_file, + run_id=wc.run_id, + init_time=[t.strftime("%Y%m%d%H%M") for t in REFTIMES_MEC], + ), + output: + # Question: Definitely want to aggregate over init time, but will we have 1 run_ffv2 per run_id, or 1 run of ffv2 for all run_ids? + namelist=OUT_ROOT / "data/runs/{run_id}/SYNOP_DET.nl", + log: + OUT_ROOT / "logs/generate_ffv2_namelist/{run_id}.log", + localrule: True + params: + # TODO: We may want more than one directory here, if we are comparing models. + feedback_directory=lambda wc: str( + OUT_ROOT / f"data/runs/{wc.run_id}/fdbk_files" + ), + # Keeping this as a param. We will create it in run_ffv2 rule. + output_directory=lambda wc: str(OUT_ROOT / f"data/runs/{wc.run_id}/scores"), + # TODO: consider including run_ids here? + experiment_ids=config["ffv2"]["experiment_ids"], + experiment_description=config["ffv2"]["experiment_description"], + file_description=config["ffv2"]["file_description"], + domain_table=config["ffv2"]["domain_table"], + blacklists=config["ffv2"]["blacklists"], + shell: + """ + ( + set -euo pipefail + mkdir -p {params.output_directory} + uv run {input.script} \ + --template {input.template} \ + --namelist {output.namelist} \ + --experiment_ids {params.experiment_ids} \ + --feedback_directories {params.feedback_directory} \ + --output_directory {params.output_directory} \ + --experiment_description {params.experiment_description} \ + --file_description {params.file_description} \ + --domain_table {params.domain_table} \ + --blacklists {params.blacklists} + ) >{log} 2>&1 + """ + + +rule sarus_pull_ffv2: + """Pull the FFV2 sarus container image once before the FFV2 job.""" + output: + touch(OUT_ROOT / "logs/sarus_pull_ffv2.ok"), + localrule: True + shell: + "sarus pull container-registry.meteoswiss.ch/ffv2ctr/ffv2-container:0.1.0-main" + + +rule run_ffv2: + input: + namelist=rules.generate_ffv2_namelist.output.namelist, + pull_ok=rules.sarus_pull_ffv2.output, + output: + scores=directory(OUT_ROOT / "data/runs/{run_id}/scores"), + log: + OUT_ROOT / "logs/run_ffv2/{run_id}.log", + params: + # domain_table and blacklists are locations on Balfrin, that will be + # mounted into container (with the same filepaths) + domain_table=rules.generate_ffv2_namelist.params.domain_table, + blacklists=rules.generate_ffv2_namelist.params.blacklists, + # QUESTION: Will we want to compare with other models? + # Need to specify this in order to mount it. + # Because namelist is a blocking input, and namelist generation + # blocks on the MEC run, this should be OK to just use as param. + feedback_directory=rules.generate_ffv2_namelist.params.feedback_directory, + shell: + """ + ( + set -euo pipefail + echo "...time at start of run_ffv2: $(date)" + + # Create the output directory to hold scores, if it does not exist + mkdir -p {output.scores} + + namelist=$(realpath {input.namelist}) + domain_table={params.domain_table} + blacklists={params.blacklists} + # Mount needs to have source as absolute path + feedback_dir_abs=$(realpath {params.feedback_directory}) + output_dir_abs=$(realpath {output.scores}) + sarus run \ + --mount=type=bind,source=$namelist,destination=/src/ffv2/SYNOP_DET.nl \ + --mount=type=bind,source=$domain_table,destination=$domain_table \ + --mount=type=bind,source=$blacklists,destination=$blacklists \ + --mount=type=bind,source=$feedback_dir_abs,destination=/src/ffv2/input \ + --mount=type=bind,source=$output_dir_abs,destination=/src/ffv2/output \ + container-registry.meteoswiss.ch/ffv2ctr/ffv2-container:0.1.0-main + + echo "...time at end of run_ffv2: $(date)" + ) >{log} 2>&1 + """ + + +rule reorganize_ffv2_files: + input: + scores=rules.run_ffv2.output.scores, + output: + shiny_dir=directory(OUT_ROOT / "data/runs/{run_id}/shiny/"), + log: + OUT_ROOT / "logs/reorganize_ffv2_files/{run_id}.log", + localrule: True + shell: + """ + ( + set -euo pipefail + echo "...time at start of reorganize_ffv2_files: $(date)" + + input_dir_abs=$(realpath {input.scores}) + output_dir_abs=$(realpath {output.shiny_dir}) + + # move score files into app-specific subdirectories, for the Shiny app + # display. + mkdir -p $output_dir_abs/fdbk_cont/data + mkdir -p $output_dir_abs/fdbk_cont_bystat/data + mkdir -p $output_dir_abs/fdbk_cont_ts/data + mkdir -p $output_dir_abs/fdbk_synop_categ/data + mkdir -p $output_dir_abs/fdbk_synop_categ_bystat/data + mkdir -p $output_dir_abs/fdbk_synop_categ_ts/data + + # DET surface continuous scores + cp $input_dir_abs/CONT_exp* $output_dir_abs/fdbk_cont/data/ + # DET surface continuous scores as time series + cp $input_dir_abs/CONT_TS_exp* $output_dir_abs/fdbk_cont_ts/data/ + # DET surface continuous by stations + cp $input_dir_abs/CONT_bs_exp* $output_dir_abs/fdbk_cont_bystat/data/ + + # Categorical verification against SYNOP + cp $input_dir_abs/CATEG_exp* $output_dir_abs/fdbk_synop_categ/data + cp $input_dir_abs/PEC_exp* $output_dir_abs/fdbk_synop_categ/data + + # Categorical verification against SYNOP by station + # This is not presently generated, so skip. + #cp $input_dir_abs/CATEG_TS_exp* $output_dir_abs/fdbk_synop_categ_ts/data + + # Categorical verification against SYNOP as time series + cp $input_dir_abs/CATEG_bs_exp* $output_dir_abs/fdbk_synop_categ_bystat/data + + echo "...time at end of reorganize_ffv2_files: $(date)" + ) >{log} 2>&1 + """ diff --git a/workflow/scripts/generate_ffv2_namelist.py b/workflow/scripts/generate_ffv2_namelist.py new file mode 100644 index 00000000..29f44162 --- /dev/null +++ b/workflow/scripts/generate_ffv2_namelist.py @@ -0,0 +1,146 @@ +import logging +import os +from argparse import ArgumentParser +from pathlib import Path + +import jinja2 + +LOG = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + + +def _check_args(args): + ids = args.experiment_ids.split(",") + fdbk_dirs = args.feedback_directories.split(",") + # Allow list to end with comma + if ids[-1] == "": + ids.pop(-1) + if fdbk_dirs[-1] == "": + fdbk_dirs.pop(-1) + if len(ids) != len(fdbk_dirs): + raise ValueError( + "lengths of experiment IDs and feedback directories differ:" + f"{len(ids)} {args.experiment_ids} vs " + f"{len(fdbk_dirs)} {args.feedback_directories}" + ) + if not os.path.exists(args.output_directory) or not os.path.isdir( + args.output_directory + ): + raise FileNotFoundError( + f"output directory {args.output_directory} " + "does not exist (or is not a directory)" + ) + if not os.path.exists(args.domain_table): + raise FileNotFoundError( + f"domain table location {args.domain_table} " + "does not exist. Check that it is mounted correctly " + " in container." + ) + if not os.path.exists(args.blacklists): + raise FileNotFoundError( + f"blacklists location {args.blacklists} " + "does not exist. Check that it is mounted correctly " + " in container." + ) + for fdbk_dir in fdbk_dirs: + if not os.path.exists(fdbk_dir) or not os.path.isdir(fdbk_dir): + raise FileNotFoundError( + f"feedback directory {fdbk_dir} does not exist. Check that MEC has run." + ) + + +def _make_veri_ens_member(experiment_ids: str) -> str: + num_ids = len(experiment_ids.split(",")) + return ",".join(["-1"] * num_ids) + + +def main(args): + # Render template with provided args + context = { + "experiment_ids": args.experiment_ids, + "veri_ens_member": _make_veri_ens_member(args.experiment_ids), + "experiment_description": args.experiment_description, + "file_description": args.file_description, + "domain_table": args.domain_table, + "blacklists": args.blacklists, + } + template_path = Path(args.template) + env = jinja2.Environment(loader=jinja2.FileSystemLoader(str(template_path.parent))) + template = env.get_template(template_path.name) + namelist = template.render(**context) + LOG.info(f"FFV2 namelist created: {namelist}") + + out_path = Path(str(args.namelist)) + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", encoding="utf-8") as f: + f.write(namelist) + + +if __name__ == "__main__": + parser = ArgumentParser( + description="Render a Jinja2 FFV2 namelist template from experiment metadata and path configuration." + ) + + parser.add_argument( + "--template", + type=str, + help="path to jinja template", + ) + + parser.add_argument( + "--namelist", + type=str, + help="full path to output namelist", + ) + + parser.add_argument( + "--experiment_ids", + type=str, + help="namelist variable: comma-separated list of models to compare", + ) + + parser.add_argument( + "--feedback_directories", + type=str, + help="namelist variable: comma-separated list of feedback directory locations", + ) + + parser.add_argument( + "--output_directory", + type=str, + help="namelist variable: location to output score files (must already exist)", + ) + + parser.add_argument( + "--experiment_description", + type=str, + help="namelist variable: string used to build the filenames of the " + "intermediate scorefiles", + ) + + parser.add_argument( + "--file_description", + type=str, + help="namelist variable: string used to build the filenames of the output", + ) + + parser.add_argument( + "--domain_table", + type=str, + help="namelist variable: path to domain table. This needs to be " + "available from container through mounting.", + ) + + parser.add_argument( + "--blacklists", + type=str, + help="namelist variable: path to blacklists. This needs to be " + "available from container through mounting.", + ) + + args = parser.parse_args() + _check_args(args) + + main(args) diff --git a/workflow/scripts/generate_mec_namelist.py b/workflow/scripts/generate_mec_namelist.py new file mode 100644 index 00000000..a0e8d97d --- /dev/null +++ b/workflow/scripts/generate_mec_namelist.py @@ -0,0 +1,91 @@ +import logging +from argparse import ArgumentParser +from datetime import datetime +from pathlib import Path + +import jinja2 + +LOG = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + + +def _parse_steps(steps: str) -> int: + # check that steps is in the format "start/stop/step" + if "/" not in steps: + raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") + if len(steps.split("/")) != 3: + raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") + start, end, step = map(int, steps.split("/")) + return list(range(start, end + 1, step)) + + +def program_summary_log(args): + """Log a welcome message with the script information.""" + LOG.info("=" * 80) + LOG.info("Generating MEC namelist") + LOG.info("=" * 80) + LOG.info("Template: %s", args.template) + LOG.info("Valid time: %s", args.init_time.strftime("%Y%m%d%H%M")) + LOG.info("Lead times: %s", args.steps) + LOG.info("Output namelist: %s", args.namelist) + LOG.info("=" * 80) + + +def main(args): + program_summary_log(args) + # Include stop_h (inclusive). Produce strings like 0000,0600,1200,...,12000 + lead_hours = args.steps + leadtimes = ",".join(f"{h:02d}00" for h in lead_hours) + + # Render template with init_time and computed leadtimes + context = {"init_time": f"{args.init_time:%Y%m%d%H%M}", "leadtimes": leadtimes} + template_path = Path(args.template) + env = jinja2.Environment(loader=jinja2.FileSystemLoader(str(template_path.parent))) + template = env.get_template(template_path.name) + namelist = template.render(**context) + # Ensure file ends with a newline (prevent editors/tools from removing final RETURN) + if not namelist.endswith("\n"): + namelist += "\n" + + out_path = Path(str(args.namelist)) + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", encoding="utf-8") as f: + f.write(namelist) + + +if __name__ == "__main__": + parser = ArgumentParser( + description="Render a Jinja2 MEC namelist template for a given initialisation time and lead times." + ) + + parser.add_argument( + "--steps", + type=_parse_steps, + default="0/120/6", + help="Forecast lead times formatted as 'start/end/step' (hours), e.g. '0/120/6'.", + ) + + parser.add_argument( + "--init_time", + type=lambda s: datetime.strptime(s, "%Y%m%d%H%M"), + default="202010010000", + help="Initialisation time in YYYYMMDDHHmm format, e.g. '202010010000'.", + ) + + parser.add_argument( + "--template", + type=str, + help="Path to the Jinja2 namelist template file.", + ) + + parser.add_argument( + "--namelist", + type=str, + help="Full path to the output namelist file to be written.", + ) + + args = parser.parse_args() + + main(args) diff --git a/workflow/scripts/inference_prepare.py b/workflow/scripts/inference_prepare.py index 839217ec..b50ae08f 100644 --- a/workflow/scripts/inference_prepare.py +++ b/workflow/scripts/inference_prepare.py @@ -99,6 +99,9 @@ def prepare_interpolator(smk): config_content = f.read() LOG.info("Config: \n%s", config_content) + okfile = Path(smk.output.okfile) + okfile.parent.mkdir(parents=True, exist_ok=True) + okfile.touch() LOG.info("Interpolator preparation complete.") @@ -124,6 +127,9 @@ def prepare_forecaster(smk): config_content = f.read() LOG.info("Config: \n%s", config_content) + okfile = Path(smk.output.okfile) + okfile.parent.mkdir(parents=True, exist_ok=True) + okfile.touch() LOG.info("Forecaster preparation complete.") diff --git a/workflow/tools/config.schema.json b/workflow/tools/config.schema.json index 566b54bd..44a62fa0 100644 --- a/workflow/tools/config.schema.json +++ b/workflow/tools/config.schema.json @@ -274,6 +274,46 @@ "title": "ExplicitDates", "type": "array" }, + "Ffv2Config": { + "additionalProperties": false, + "description": "Configuration for the FFV2 scoring pipeline.", + "properties": { + "experiment_ids": { + "description": "Comma-separated experiment IDs passed to FFV2.", + "title": "Experiment Ids", + "type": "string" + }, + "experiment_description": { + "description": "Short description of the experiment for FFV2 output files.", + "title": "Experiment Description", + "type": "string" + }, + "file_description": { + "description": "File description string used in FFV2 output file naming.", + "title": "File Description", + "type": "string" + }, + "domain_table": { + "description": "Path to the domain table file (polygon) used by FFV2.", + "title": "Domain Table", + "type": "string" + }, + "blacklists": { + "description": "Path to the blacklist directory used by FFV2.", + "title": "Blacklists", + "type": "string" + } + }, + "required": [ + "experiment_ids", + "experiment_description", + "file_description", + "domain_table", + "blacklists" + ], + "title": "Ffv2Config", + "type": "object" + }, "ForecasterConfig": { "additionalProperties": false, "description": "Single training run stored in MLflow.", @@ -574,6 +614,34 @@ "title": "Locations", "type": "object" }, + "MecConfig": { + "additionalProperties": false, + "description": "Paths to input observation files for the MEC verification step.", + "properties": { + "ekf_root": { + "description": "Root directory for EKF SYNOP files. Files are expected at {ekf_root}/{YYYYMM}/ekfSYNOP_{init}00.nc.", + "title": "Ekf Root", + "type": "string" + }, + "mon_synop_root": { + "description": "Root directory for monSYNOP files. Files are expected at {mon_synop_root}/{YYYYMMDDH}/monSYNOP.nc.", + "title": "Mon Synop Root", + "type": "string" + }, + "ver_synop_root": { + "description": "Root directory for reference verSYNOP files. Files are expected at {ver_synop_root}/verSYNOP_{init}00.nc.", + "title": "Ver Synop Root", + "type": "string" + } + }, + "required": [ + "ekf_root", + "mon_synop_root", + "ver_synop_root" + ], + "title": "MecConfig", + "type": "object" + }, "MeteogramConfig": { "description": "Configuration for meteogram generation.", "properties": { @@ -792,6 +860,30 @@ "showcase": { "$ref": "#/$defs/ShowcaseConfig", "description": "Settings for the showcase workflow." + }, + "mec": { + "anyOf": [ + { + "$ref": "#/$defs/MecConfig" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Input observation paths for the MEC verification step. Required when running with --mec." + }, + "ffv2": { + "anyOf": [ + { + "$ref": "#/$defs/Ffv2Config" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Configuration for the FFV2 scoring pipeline. Required when running with --ffv2." } }, "required": [