# ============================================================= # ALGO - DOCUMENT HEADER # ------------------------------------------------------------- # Doc ID : B6 # Title : CORE - EA OUTPUT VALIDATOR STRICT (ALL REQUIRED) # Filename : B6 - CORE - EA OUTPUT VALIDATOR STRICT (ALL REQUIRED) - V06.py # Version : V08 # Updated : 2026-04-25 16:35 (Europe/Rome) # Status : ACTIVE (ENFORCEMENT / NON NEGOZIABILE) # Changelog : # - V08: introdotto supporto validator al blocco opzionale `logic_occurrences` per audit occurrence-based e provenance esplicita. # - V07: introdotto supporto validator al blocco opzionale `logic_state` per serie booleane state-based di Condition, Pattern e Position State. # - V06: corretto il controllo su `pattern.mre`: MRE e' il minimo EXEC ammesso, quindi puo' essere uguale o piu' fine del `pattern_tf`; il validator non deve rifiutare pattern daily con MRE intraday. # - V05: introdotto supporto validator a `meta.runtime_anchor_dt` opzionale per eventi logici, cosi' la cronologia runtime puo' distinguersi dal timestamp logico di certificazione. # - V04: chiarita la differenza tra versione del documento e contract_version del payload validato. # - V03: allineato a B3-V04 e B2-V03. # - V03: strategy_params separati in variables/constants. # - V03: pattern_registry allineato a Pattern TF, MRE e Pattern Symbol. # - V03: deep validation aggiornata da pattern_dfs flat a pattern_contexts. # ============================================================= from __future__ import annotations from typing import Any, Dict, Optional, Set import pandas as pd from zoneinfo import ZoneInfo CONTRACT_ID = "ALGO_EA_OUTPUT" CONTRACT_VERSION = "B3-V04" # Payload contract version currently enforced by validator. DATASET_TZ = ZoneInfo("America/New_York") ALLOWED_SIDES = {"LONG", "SHORT"} ALLOWED_EVENT_TYPES = {"CONDITION", "PATTERN", "ENTRY", "EXIT"} CANONICAL_TIMEFRAMES = { "S1", "M1", "M3", "M5", "M10", "M15", "M30", "H1", "H2", "H4", "H6", "D", "W", "M", "Y", "EXEC", } DB_TIMEFRAMES = CANONICAL_TIMEFRAMES - {"EXEC"} EXTRA_FORBIDDEN_TOP_LEVEL_KEYS: Set[str] = { "equity", "returns", "drawdown", "metrics", "stats", "logs", "debug", "warning", "error" } TOP_LEVEL_KEYS: Set[str] = { "contract_id", "contract_version", "generated_at_utc", "symbol", "timeframe", "period", "strategy_name", "strategy_version", "strategy_code", "strategy_type", "strategy_params", "pattern_registry", "ai_description", "trades", "build", "meta", } OPTIONAL_TOP_LEVEL_KEYS: Set[str] = {"logic_state", "logic_occurrences"} LOGIC_STATE_SCHEMA_VERSION = "LS-V01" REQUIRED_LOGIC_STATE_KEYS: Set[str] = {"schema_version", "series", "meta"} REQUIRED_LOGIC_STATE_SERIES_KEYS: Set[str] = { "state_id", "state_type", "state_tf", "visual_preset", "points", "meta", } REQUIRED_LOGIC_STATE_POINT_KEYS: Set[str] = {"dt", "value"} ALLOWED_LOGIC_STATE_TYPES = {"CONDITION", "PATTERN", "POSITION"} ALLOWED_VISUAL_PRESETS = { "event_only", "until_cert", "until_entry", "until_exit", "while_setup_active", "while_position_open", "sticky_historical", "boolean_bar_by_bar", "custom_rule", } LOGIC_OCCURRENCES_SCHEMA_VERSION = "LO-V01" REQUIRED_LOGIC_OCCURRENCES_KEYS: Set[str] = {"schema_version", "items", "meta"} REQUIRED_LOGIC_OCCURRENCE_ITEM_KEYS: Set[str] = { "occurrence_id", "entity_id", "entity_type", "logic_label_short", "logic_label_long", "semantic_role", "status", "occurrence_tf", "start_dt", "start_price", "cert_dt", "cert_price", "depends_on_occurrence_ids", "used_by_trade_ids", "meta", } ALLOWED_LOGIC_OCCURRENCE_TYPES = {"CONDITION", "PATTERN", "ENTRY", "EXIT", "POSITION"} ALLOWED_LOGIC_OCCURRENCE_ROLES = { "anchor", "candidate", "confirmation", "filter", "gate", "pattern_complete", "entry_trigger", "exit_trigger", "position_state", "invalidation", "replacement", } ALLOWED_LOGIC_OCCURRENCE_STATUS = { "candidate", "surviving_candidate", "certified", "selected", "superseded", "invalidated", } REQUIRED_BUILD_KEYS: Set[str] = { "algo_version", "runtime_version", "ea_template_version", "validator_version", "git_commit", "python_version", "build_machine", "dataset_timezone", "output_timezone", "dt_convention", } REQUIRED_STRATEGY_PARAM_ROOT_KEYS: Set[str] = {"variables", "constants"} REQUIRED_VARIABLE_KEYS: Set[str] = { "initial_capital", "position_size", "commission_per_trade", "slippage_ticks", } REQUIRED_CONSTANT_KEYS: Set[str] = {"tick_size", "point_value"} REQUIRED_REGISTRY_KEYS: Set[str] = {"registry_version", "parameters", "patterns", "meta"} REQUIRED_REGISTRY_PARAMETER_KEYS: Set[str] = {"variables", "constants"} REQUIRED_PATTERN_KEYS: Set[str] = { "pattern_id", "pattern_symbol", "title", "summary", "pattern_tf", "mre", "conditions", "entries", "exits", "completion_rule", } REQUIRED_CONDITION_KEYS: Set[str] = {"condition_id", "description", "easy_language", "tf", "certification_rule"} REQUIRED_ENTRY_KEYS: Set[str] = {"entry_id", "description", "easy_language", "tf", "fill_policy"} REQUIRED_EXIT_KEYS: Set[str] = {"exit_id", "description", "easy_language", "tf", "fill_policy"} REQUIRED_TRADE_KEYS: Set[str] = { "id", "side", "qty", "entry_dt", "entry_price", "entry_bar", "exit_dt", "exit_price", "exit_bar", "stop_price", "mae_ticks", "mfe_ticks", "mae_points", "mfe_points", "mae_ccy_1c", "mfe_ccy_1c", "pnl_gross", "commission", "slippage", "pnl_net", "events", "meta", } REQUIRED_EVENT_KEYS: Set[str] = { "event_id", "event_type", "tf", "start_dt", "start_price", "cert_tf", "cert_dt", "cert_price", "meta", } def _ensure(cond: bool, msg: str) -> None: if not cond: raise ValueError(msg) def _float_close(a: float, b: float, tol: float = 1e-8) -> bool: return abs(float(a) - float(b)) <= tol def _is_utc_iso_datetime(s: Any) -> bool: if not isinstance(s, str) or not s.strip(): return False try: ts = pd.Timestamp(s) except Exception: return False if ts.tzinfo is None: return False try: return ts.tz_convert("UTC").utcoffset() == pd.Timedelta(0) except Exception: return False def _df_dt_to_utc_series(df: pd.DataFrame) -> pd.Series: _ensure("dt" in df.columns, "df must have column 'dt'") s = pd.to_datetime(df["dt"], errors="coerce") _ensure(not s.isna().all(), "df.dt could not be parsed to datetime") out = [] for v in s: if pd.isna(v): out.append(pd.NaT) continue ts = pd.Timestamp(v) if ts.tzinfo is None: ts = ts.tz_localize(DATASET_TZ) out.append(ts.tz_convert("UTC")) return pd.Series(out, index=df.index) def _event_runtime_dt(event: Dict[str, Any]) -> pd.Timestamp: meta = event.get("meta") if isinstance(meta, dict): runtime_anchor_dt = meta.get("runtime_anchor_dt") if runtime_anchor_dt is not None: _ensure( _is_utc_iso_datetime(runtime_anchor_dt), f"event.meta.runtime_anchor_dt must be UTC ISO-8601 string (...Z) when provided: {event.get('event_id')}", ) return pd.Timestamp(runtime_anchor_dt).tz_convert("UTC") return pd.Timestamp(event["cert_dt"]).tz_convert("UTC") def _event_logical_bar_start_dt(event: Dict[str, Any]) -> Optional[pd.Timestamp]: meta = event.get("meta") if not isinstance(meta, dict): return None logical_bar_start_dt = meta.get("logical_bar_start_dt") if logical_bar_start_dt is None: return None _ensure( _is_utc_iso_datetime(logical_bar_start_dt), f"event.meta.logical_bar_start_dt must be UTC ISO-8601 string (...Z) when provided: {event.get('event_id')}", ) return pd.Timestamp(logical_bar_start_dt).tz_convert("UTC") def _parse_tf_to_offset(tf: str): token = str(tf or "").strip().upper() _ensure(token in DB_TIMEFRAMES, f"Invalid DB timeframe token: {tf!r}") mapping = { "S1": pd.Timedelta(seconds=1), "M1": pd.Timedelta(minutes=1), "M3": pd.Timedelta(minutes=3), "M5": pd.Timedelta(minutes=5), "M10": pd.Timedelta(minutes=10), "M15": pd.Timedelta(minutes=15), "M30": pd.Timedelta(minutes=30), "H1": pd.Timedelta(hours=1), "H2": pd.Timedelta(hours=2), "H4": pd.Timedelta(hours=4), "H6": pd.Timedelta(hours=6), "D": pd.Timedelta(days=1), "W": pd.DateOffset(weeks=1), "M": pd.DateOffset(months=1), "Y": pd.DateOffset(years=1), } return mapping[token] def _lookup_close_at_event_time(close_map: Dict[pd.Timestamp, float], event_dt_utc: pd.Timestamp, tf_dur, context: str) -> float: if event_dt_utc in close_map: return close_map[event_dt_utc] try: dt_minus = event_dt_utc - tf_dur except Exception as exc: raise ValueError(f"Cannot compute tf offset for {context}: {exc}") from exc if dt_minus in close_map: return close_map[dt_minus] raise ValueError(f"Cannot map {context} dt={event_dt_utc} to any bar start (dt or dt-tf).") def _lookup_close_for_event(event: Dict[str, Any], close_map: Dict[pd.Timestamp, float], event_dt_utc: pd.Timestamp, tf_dur, context: str) -> float: logical_bar_start_dt = _event_logical_bar_start_dt(event) if logical_bar_start_dt is not None: if logical_bar_start_dt in close_map: return close_map[logical_bar_start_dt] raise ValueError(f"Cannot map {context} logical_bar_start_dt={logical_bar_start_dt} to any known bar start.") return _lookup_close_at_event_time(close_map, event_dt_utc, tf_dur, context) def validate_output_strict(result: Dict[str, Any]) -> None: _ensure(isinstance(result, dict), "Result must be a dict.") keys = set(result.keys()) missing = sorted(TOP_LEVEL_KEYS - keys) _ensure(not missing, f"Missing required top-level keys: {missing}") unknown = sorted(keys - TOP_LEVEL_KEYS - OPTIONAL_TOP_LEVEL_KEYS) _ensure(not unknown, f"Unknown top-level keys (forbidden): {unknown}") forbidden = sorted(keys & EXTRA_FORBIDDEN_TOP_LEVEL_KEYS) _ensure(not forbidden, f"Forbidden top-level keys present: {forbidden}") _ensure(result["contract_id"] == CONTRACT_ID, "contract_id mismatch.") _ensure(result["contract_version"] == CONTRACT_VERSION, "contract_version mismatch.") _ensure(_is_utc_iso_datetime(result["generated_at_utc"]), "generated_at_utc must be UTC ISO-8601 string (...Z).") for k in ["symbol", "timeframe", "period", "strategy_name", "strategy_version", "strategy_code", "strategy_type"]: _ensure(isinstance(result[k], str), f"{k} must be string.") if k != "period": _ensure(bool(result[k].strip()), f"{k} must be non-empty string.") _ensure(result["timeframe"] in DB_TIMEFRAMES, f"timeframe must be one of {sorted(DB_TIMEFRAMES)}") _ensure(result["strategy_type"] == "EA", "strategy_type must be 'EA'.") _ensure(isinstance(result["strategy_params"], dict), "strategy_params must be dict.") _ensure(isinstance(result["pattern_registry"], dict), "pattern_registry must be dict.") _ensure(isinstance(result["ai_description"], str), "ai_description must be string.") _ensure(isinstance(result["trades"], list), "trades must be list.") _ensure(isinstance(result["build"], dict), "build must be dict.") _ensure(isinstance(result["meta"], dict), "meta must be dict.") build = result["build"] miss_build = sorted(REQUIRED_BUILD_KEYS - set(build.keys())) _ensure(not miss_build, f"Missing build keys: {miss_build}") for bk in REQUIRED_BUILD_KEYS: _ensure(isinstance(build[bk], str), f"build.{bk} must be string.") _ensure(build["dataset_timezone"] == "America/New_York", "build.dataset_timezone must be 'America/New_York'.") _ensure(build["output_timezone"] == "UTC", "build.output_timezone must be 'UTC'.") _ensure(build["dt_convention"] == "BAR_START", "build.dt_convention must be 'BAR_START'.") sp = result["strategy_params"] miss_sp_root = sorted(REQUIRED_STRATEGY_PARAM_ROOT_KEYS - set(sp.keys())) _ensure(not miss_sp_root, f"Missing required strategy_params root keys: {miss_sp_root}") extra_sp_root = sorted(set(sp.keys()) - REQUIRED_STRATEGY_PARAM_ROOT_KEYS) _ensure(not extra_sp_root, f"Forbidden strategy_params root keys: {extra_sp_root}") _ensure(isinstance(sp["variables"], dict), "strategy_params.variables must be dict") _ensure(isinstance(sp["constants"], dict), "strategy_params.constants must be dict") variables = sp["variables"] constants = sp["constants"] miss_vars = sorted(REQUIRED_VARIABLE_KEYS - set(variables.keys())) miss_consts = sorted(REQUIRED_CONSTANT_KEYS - set(constants.keys())) _ensure(not miss_vars, f"Missing strategy_params.variables keys: {miss_vars}") _ensure(not miss_consts, f"Missing strategy_params.constants keys: {miss_consts}") reserved_vars = sorted(k for k in variables.keys() if isinstance(k, str) and k.startswith("_")) reserved_consts = sorted(k for k in constants.keys() if isinstance(k, str) and k.startswith("_")) _ensure(not reserved_vars, f"strategy_params.variables contains forbidden reserved keys: {reserved_vars}") _ensure(not reserved_consts, f"strategy_params.constants contains forbidden reserved keys: {reserved_consts}") _ensure(float(variables["initial_capital"]) > 0, "strategy_params.variables.initial_capital must be > 0") _ensure(float(variables["position_size"]) > 0, "strategy_params.variables.position_size must be > 0") _ensure(float(variables["commission_per_trade"]) >= 0, "strategy_params.variables.commission_per_trade must be >= 0") _ensure(float(constants["tick_size"]) > 0, "strategy_params.constants.tick_size must be > 0") _ensure(float(constants["point_value"]) > 0, "strategy_params.constants.point_value must be > 0") st = variables["slippage_ticks"] _ensure(isinstance(st, (int, float)), "strategy_params.variables.slippage_ticks must be numeric") _ensure(float(st).is_integer(), "strategy_params.variables.slippage_ticks must be integer-valued") _validate_pattern_registry(result["pattern_registry"]) if "logic_state" in result: _validate_logic_state(result["logic_state"], result["pattern_registry"]) if "logic_occurrences" in result: _validate_logic_occurrences(result["logic_occurrences"], result["pattern_registry"], result["trades"]) seen_trade_ids = set() tick_size = float(constants["tick_size"]) point_value = float(constants["point_value"]) for t in result["trades"]: _ensure(isinstance(t, dict), "Each trade must be dict.") tkeys = set(t.keys()) miss_t = sorted(REQUIRED_TRADE_KEYS - tkeys) _ensure(not miss_t, f"Trade missing required keys: {miss_t}") extra_t = sorted(tkeys - REQUIRED_TRADE_KEYS) _ensure(not extra_t, f"Trade has forbidden extra keys: {extra_t}") _ensure(isinstance(t["meta"], dict), "trade.meta must be dict.") _ensure(isinstance(t["id"], int) and t["id"] > 0, "trade.id must be int > 0") _ensure(t["id"] not in seen_trade_ids, f"Duplicate trade.id: {t['id']}") seen_trade_ids.add(t["id"]) _ensure(t["side"] in ALLOWED_SIDES, f"trade.side must be one of {sorted(ALLOWED_SIDES)}") _ensure(isinstance(t["qty"], (int, float)) and float(t["qty"]) > 0, "trade.qty must be > 0") _ensure(isinstance(t["entry_bar"], int) and t["entry_bar"] >= 0, "trade.entry_bar must be int >= 0") _ensure(isinstance(t["exit_bar"], int) and t["exit_bar"] >= 0, "trade.exit_bar must be int >= 0") _ensure(t["entry_bar"] < t["exit_bar"], "trade.entry_bar must be < trade.exit_bar") _ensure(_is_utc_iso_datetime(t["entry_dt"]), "trade.entry_dt must be UTC ISO-8601 string (...Z).") _ensure(_is_utc_iso_datetime(t["exit_dt"]), "trade.exit_dt must be UTC ISO-8601 string (...Z).") entry_dt = pd.Timestamp(t["entry_dt"]).tz_convert("UTC") exit_dt = pd.Timestamp(t["exit_dt"]).tz_convert("UTC") _ensure(entry_dt < exit_dt, "trade.entry_dt must be < trade.exit_dt") for pk in ["entry_price", "exit_price"]: _ensure(isinstance(t[pk], (int, float)) and float(t[pk]) > 0, f"trade.{pk} must be numeric > 0") spv = t["stop_price"] _ensure((spv is None) or isinstance(spv, (int, float)), "trade.stop_price must be numeric or null") for mk in ["mae_ticks", "mfe_ticks", "mae_points", "mfe_points", "mae_ccy_1c", "mfe_ccy_1c"]: _ensure(isinstance(t[mk], (int, float)), f"trade.{mk} must be numeric") _ensure(float(t[mk]) >= 0, f"trade.{mk} must be >= 0") _ensure(_float_close(float(t["mae_points"]), float(t["mae_ticks"]) * tick_size), "MAE identity failed: mae_points != mae_ticks * tick_size") _ensure(_float_close(float(t["mfe_points"]), float(t["mfe_ticks"]) * tick_size), "MFE identity failed: mfe_points != mfe_ticks * tick_size") _ensure(_float_close(float(t["mae_ccy_1c"]), float(t["mae_points"]) * point_value), "MAE currency identity failed: mae_ccy_1c != mae_points * point_value") _ensure(_float_close(float(t["mfe_ccy_1c"]), float(t["mfe_points"]) * point_value), "MFE currency identity failed: mfe_ccy_1c != mfe_points * point_value") for pk in ["pnl_gross", "commission", "slippage", "pnl_net"]: _ensure(isinstance(t[pk], (int, float)), f"trade.{pk} must be numeric") _ensure(float(t["commission"]) >= 0, "trade.commission must be >= 0") _ensure(float(t["slippage"]) >= 0, "trade.slippage must be >= 0") qty = float(t["qty"]) commission_expected = float(variables["commission_per_trade"]) * qty slippage_expected = float(variables["slippage_ticks"]) * tick_size * point_value * qty _ensure(_float_close(float(t["commission"]), commission_expected, tol=1e-6), "commission mismatch") _ensure(_float_close(float(t["slippage"]), slippage_expected, tol=1e-6), "slippage mismatch") _ensure(_float_close(float(t["pnl_net"]), float(t["pnl_gross"]) - float(t["commission"]) - float(t["slippage"]), tol=1e-6), "pnl_net identity failed") events = t["events"] _ensure(isinstance(events, list) and events, "trade.events must be non-empty list") event_ids = set() for e in events: _ensure(isinstance(e, dict), "Each event must be dict.") ekeys = set(e.keys()) miss_e = sorted(REQUIRED_EVENT_KEYS - ekeys) _ensure(not miss_e, f"Event missing required keys: {miss_e}") extra_e = sorted(ekeys - REQUIRED_EVENT_KEYS) _ensure(not extra_e, f"Event has forbidden extra keys: {extra_e}") _ensure(isinstance(e["meta"], dict), "event.meta must be dict") _ensure(isinstance(e["event_id"], str) and e["event_id"].strip(), "event_id must be non-empty string") _ensure(e["event_id"] not in event_ids, f"Duplicate event_id in same trade: {e['event_id']}") event_ids.add(e["event_id"]) _ensure(e["event_type"] in ALLOWED_EVENT_TYPES, f"event_type must be one of {sorted(ALLOWED_EVENT_TYPES)}") _ensure(isinstance(e["tf"], str) and e["tf"].strip(), "event.tf must be non-empty string") _ensure(isinstance(e["cert_tf"], str) and e["cert_tf"].strip(), "event.cert_tf must be non-empty string") _ensure(e["tf"] in CANONICAL_TIMEFRAMES, f"event.tf must be canonical token: {e['tf']}") _ensure(e["cert_tf"] in CANONICAL_TIMEFRAMES, f"event.cert_tf must be canonical token: {e['cert_tf']}") _ensure(_is_utc_iso_datetime(e["start_dt"]), "event.start_dt must be UTC ISO-8601 string (...Z).") _ensure(_is_utc_iso_datetime(e["cert_dt"]), "event.cert_dt must be UTC ISO-8601 string (...Z).") sdt = pd.Timestamp(e["start_dt"]).tz_convert("UTC") cdt = pd.Timestamp(e["cert_dt"]).tz_convert("UTC") _ensure(sdt <= cdt, "event.start_dt must be <= event.cert_dt") _ensure(isinstance(e["start_price"], (int, float)) and float(e["start_price"]) > 0, "event.start_price must be > 0") _ensure(isinstance(e["cert_price"], (int, float)) and float(e["cert_price"]) > 0, "event.cert_price must be > 0") pattern_events = [e for e in events if e["event_type"] == "PATTERN"] entry_events = [e for e in events if e["event_type"] == "ENTRY"] exit_events = [e for e in events if e["event_type"] == "EXIT"] condition_events = [e for e in events if e["event_type"] == "CONDITION"] _ensure(pattern_events, "Trade must include at least one PATTERN event.") _ensure(entry_events, "Trade must include at least one ENTRY event.") _ensure(exit_events, "Trade must include at least one EXIT event.") _ensure(condition_events, "Trade must include at least one CONDITION event.") _ensure(all(e["tf"] == "EXEC" for e in entry_events), "All ENTRY events must use tf=EXEC") _ensure(all(e["tf"] == "EXEC" for e in exit_events), "All EXIT events must use tf=EXEC") p_last = max(_event_runtime_dt(e) for e in pattern_events) e_first = min(pd.Timestamp(e["cert_dt"]).tz_convert("UTC") for e in entry_events) x_first = min(pd.Timestamp(e["cert_dt"]).tz_convert("UTC") for e in exit_events) _ensure(p_last <= e_first, "PATTERN runtime/cert anchor must be <= ENTRY cert_dt") _ensure(e_first <= x_first, "ENTRY cert_dt must be <= EXIT cert_dt") _crosscheck_events_vs_registry(events, result["pattern_registry"]) def _validate_pattern_registry(reg: Dict[str, Any]) -> None: _ensure(isinstance(reg, dict), "pattern_registry must be dict") keys = set(reg.keys()) miss = sorted(REQUIRED_REGISTRY_KEYS - keys) _ensure(not miss, f"pattern_registry missing required keys: {miss}") extra = sorted(keys - REQUIRED_REGISTRY_KEYS) _ensure(not extra, f"pattern_registry has forbidden extra keys: {extra}") _ensure(isinstance(reg["registry_version"], str) and reg["registry_version"].strip(), "pattern_registry.registry_version must be non-empty string") _ensure(reg["registry_version"] == CONTRACT_VERSION, "pattern_registry.registry_version must match contract version") _ensure(isinstance(reg["parameters"], dict), "pattern_registry.parameters must be dict") _ensure(isinstance(reg["patterns"], list), "pattern_registry.patterns must be list") _ensure(isinstance(reg["meta"], dict), "pattern_registry.meta must be dict") reg_param_keys = set(reg["parameters"].keys()) miss_reg_params = sorted(REQUIRED_REGISTRY_PARAMETER_KEYS - reg_param_keys) _ensure(not miss_reg_params, f"pattern_registry.parameters missing keys: {miss_reg_params}") for p in reg["patterns"]: _ensure(isinstance(p, dict), "Each pattern must be dict") pkeys = set(p.keys()) miss_p = sorted(REQUIRED_PATTERN_KEYS - pkeys) _ensure(not miss_p, f"pattern missing keys: {miss_p}") extra_p = sorted(pkeys - REQUIRED_PATTERN_KEYS) _ensure(not extra_p, f"pattern has extra keys: {extra_p}") _ensure(isinstance(p["pattern_id"], str) and p["pattern_id"].strip(), "pattern.pattern_id must be non-empty string") _ensure(isinstance(p["pattern_symbol"], str) and p["pattern_symbol"].strip(), "pattern.pattern_symbol must be non-empty string") _ensure(isinstance(p["title"], str), "pattern.title must be string") _ensure(isinstance(p["summary"], str), "pattern.summary must be string") _ensure(isinstance(p["completion_rule"], str) and p["completion_rule"].strip(), "pattern.completion_rule must be non-empty string") _ensure(isinstance(p["conditions"], list), "pattern.conditions must be list") _ensure(isinstance(p["entries"], list), "pattern.entries must be list") _ensure(isinstance(p["exits"], list), "pattern.exits must be list") _ensure(isinstance(p["pattern_tf"], str) and p["pattern_tf"] in DB_TIMEFRAMES, "pattern.pattern_tf must be canonical DB timeframe") _ensure(isinstance(p["mre"], str) and p["mre"] in DB_TIMEFRAMES, "pattern.mre must be canonical DB timeframe") condition_tfs = [] for c in p["conditions"]: _ensure(isinstance(c, dict), "condition must be dict") ckeys = set(c.keys()) miss_c = sorted(REQUIRED_CONDITION_KEYS - ckeys) _ensure(not miss_c, f"condition missing keys: {miss_c}") extra_c = sorted(ckeys - REQUIRED_CONDITION_KEYS) _ensure(not extra_c, f"condition has extra keys: {extra_c}") _ensure(isinstance(c["condition_id"], str) and c["condition_id"].strip(), "condition_id must be non-empty string") _ensure(isinstance(c["tf"], str) and c["tf"] in DB_TIMEFRAMES, f"condition.tf must be canonical DB timeframe: {c.get('tf')}") condition_tfs.append(c["tf"]) _ensure(bool(condition_tfs), f"pattern {p['pattern_id']} must contain at least one condition") derived_pattern_tf = min(condition_tfs, key=_timeframe_rank) _ensure(p["pattern_tf"] == derived_pattern_tf, f"pattern_tf must match smallest condition tf for pattern {p['pattern_id']}") _ensure( _timeframe_rank(p["mre"]) >= _timeframe_rank(p["pattern_tf"]), f"pattern.mre cannot be more coarse than pattern_tf for pattern {p['pattern_id']}", ) for e in p["entries"]: _ensure(isinstance(e, dict), "entry must be dict") ekeys = set(e.keys()) miss_e = sorted(REQUIRED_ENTRY_KEYS - ekeys) _ensure(not miss_e, f"entry missing keys: {miss_e}") extra_e = sorted(ekeys - REQUIRED_ENTRY_KEYS) _ensure(not extra_e, f"entry has extra keys: {extra_e}") _ensure(isinstance(e["entry_id"], str) and e["entry_id"].strip(), "entry_id must be non-empty string") _ensure(e["tf"] == "EXEC", f"entry.tf must be EXEC for entry {e['entry_id']}") for x in p["exits"]: _ensure(isinstance(x, dict), "exit must be dict") xkeys = set(x.keys()) miss_x = sorted(REQUIRED_EXIT_KEYS - xkeys) _ensure(not miss_x, f"exit missing keys: {miss_x}") extra_x = sorted(xkeys - REQUIRED_EXIT_KEYS) _ensure(not extra_x, f"exit has extra keys: {extra_x}") _ensure(isinstance(x["exit_id"], str) and x["exit_id"].strip(), "exit_id must be non-empty string") _ensure(x["tf"] == "EXEC", f"exit.tf must be EXEC for exit {x['exit_id']}") def _validate_logic_state(logic_state: Dict[str, Any], reg: Dict[str, Any]) -> None: _ensure(isinstance(logic_state, dict), "logic_state must be dict when present") keys = set(logic_state.keys()) missing = sorted(REQUIRED_LOGIC_STATE_KEYS - keys) _ensure(not missing, f"logic_state missing required keys: {missing}") extra = sorted(keys - REQUIRED_LOGIC_STATE_KEYS) _ensure(not extra, f"logic_state has forbidden extra keys: {extra}") _ensure( logic_state["schema_version"] == LOGIC_STATE_SCHEMA_VERSION, f"logic_state.schema_version must be {LOGIC_STATE_SCHEMA_VERSION!r}", ) _ensure(isinstance(logic_state["series"], list), "logic_state.series must be list") _ensure(isinstance(logic_state["meta"], dict), "logic_state.meta must be dict") pattern_ids = set() condition_ids = set() for pattern in reg.get("patterns", []): pattern_ids.add(pattern["pattern_id"]) for condition in pattern.get("conditions", []): condition_ids.add(condition["condition_id"]) seen_state_keys: Set[tuple[str, str]] = set() for series in logic_state["series"]: _ensure(isinstance(series, dict), "Each logic_state series must be dict") skeys = set(series.keys()) miss = sorted(REQUIRED_LOGIC_STATE_SERIES_KEYS - skeys) _ensure(not miss, f"logic_state series missing required keys: {miss}") extra = sorted(skeys - REQUIRED_LOGIC_STATE_SERIES_KEYS) _ensure(not extra, f"logic_state series has forbidden extra keys: {extra}") _ensure(isinstance(series["state_id"], str) and series["state_id"].strip(), "logic_state.state_id must be non-empty string") _ensure(series["state_type"] in ALLOWED_LOGIC_STATE_TYPES, f"logic_state.state_type invalid: {series['state_type']!r}") _ensure(series["state_tf"] in CANONICAL_TIMEFRAMES, f"logic_state.state_tf must be canonical token: {series['state_tf']!r}") _ensure( isinstance(series["visual_preset"], str) and series["visual_preset"] in ALLOWED_VISUAL_PRESETS, f"logic_state.visual_preset invalid: {series['visual_preset']!r}", ) _ensure(isinstance(series["points"], list), "logic_state.points must be list") _ensure(isinstance(series["meta"], dict), "logic_state.series.meta must be dict") state_key = (str(series["state_type"]), str(series["state_id"])) _ensure(state_key not in seen_state_keys, f"Duplicate logic_state series for {state_key}") seen_state_keys.add(state_key) if series["state_type"] == "CONDITION": _ensure(series["state_id"] in condition_ids, f"logic_state CONDITION state_id not in pattern_registry.conditions: {series['state_id']}") elif series["state_type"] == "PATTERN": _ensure(series["state_id"] in pattern_ids, f"logic_state PATTERN state_id not in pattern_registry.patterns: {series['state_id']}") prev_dt: Optional[pd.Timestamp] = None for point in series["points"]: _ensure(isinstance(point, dict), "Each logic_state point must be dict") pkeys = set(point.keys()) miss_point = sorted(REQUIRED_LOGIC_STATE_POINT_KEYS - pkeys) _ensure(not miss_point, f"logic_state point missing required keys: {miss_point}") extra_point = sorted(pkeys - REQUIRED_LOGIC_STATE_POINT_KEYS) _ensure(not extra_point, f"logic_state point has forbidden extra keys: {extra_point}") _ensure(_is_utc_iso_datetime(point["dt"]), "logic_state point.dt must be UTC ISO-8601 string (...Z)") _ensure(isinstance(point["value"], bool), "logic_state point.value must be boolean") current_dt = pd.Timestamp(point["dt"]).tz_convert("UTC") if prev_dt is not None: _ensure(current_dt > prev_dt, "logic_state points must be strictly increasing by dt") prev_dt = current_dt def _validate_logic_occurrences( logic_occurrences: Dict[str, Any], reg: Dict[str, Any], trades: list[Dict[str, Any]], ) -> None: _ensure(isinstance(logic_occurrences, dict), "logic_occurrences must be dict when present") keys = set(logic_occurrences.keys()) missing = sorted(REQUIRED_LOGIC_OCCURRENCES_KEYS - keys) _ensure(not missing, f"logic_occurrences missing required keys: {missing}") extra = sorted(keys - REQUIRED_LOGIC_OCCURRENCES_KEYS) _ensure(not extra, f"logic_occurrences has forbidden extra keys: {extra}") _ensure( logic_occurrences["schema_version"] == LOGIC_OCCURRENCES_SCHEMA_VERSION, f"logic_occurrences.schema_version must be {LOGIC_OCCURRENCES_SCHEMA_VERSION!r}", ) _ensure(isinstance(logic_occurrences["items"], list), "logic_occurrences.items must be list") _ensure(isinstance(logic_occurrences["meta"], dict), "logic_occurrences.meta must be dict") pattern_ids = set() condition_ids = set() entry_ids = set() exit_ids = set() for pattern in reg.get("patterns", []): pattern_ids.add(pattern["pattern_id"]) for condition in pattern.get("conditions", []): condition_ids.add(condition["condition_id"]) for entry in pattern.get("entries", []): entry_ids.add(entry["entry_id"]) for exit_ in pattern.get("exits", []): exit_ids.add(exit_["exit_id"]) trade_ids = {int(t["id"]) for t in trades} seen_occurrence_ids: Set[str] = set() for item in logic_occurrences["items"]: _ensure(isinstance(item, dict), "Each logic_occurrences item must be dict") ikeys = set(item.keys()) miss = sorted(REQUIRED_LOGIC_OCCURRENCE_ITEM_KEYS - ikeys) _ensure(not miss, f"logic_occurrence item missing required keys: {miss}") extra = sorted(ikeys - REQUIRED_LOGIC_OCCURRENCE_ITEM_KEYS) _ensure(not extra, f"logic_occurrence item has forbidden extra keys: {extra}") occurrence_id = item["occurrence_id"] _ensure(isinstance(occurrence_id, str) and occurrence_id.strip(), "logic_occurrence.occurrence_id must be non-empty string") _ensure(occurrence_id not in seen_occurrence_ids, f"Duplicate logic_occurrence.occurrence_id: {occurrence_id!r}") seen_occurrence_ids.add(occurrence_id) entity_type = item["entity_type"] entity_id = item["entity_id"] _ensure(entity_type in ALLOWED_LOGIC_OCCURRENCE_TYPES, f"logic_occurrence.entity_type invalid: {entity_type!r}") _ensure(isinstance(entity_id, str) and entity_id.strip(), "logic_occurrence.entity_id must be non-empty string") if entity_type == "CONDITION": _ensure(entity_id in condition_ids, f"logic_occurrence.entity_id not found in registry conditions: {entity_id!r}") elif entity_type == "PATTERN": _ensure(entity_id in pattern_ids, f"logic_occurrence.entity_id not found in registry patterns: {entity_id!r}") elif entity_type == "ENTRY": _ensure(entity_id in entry_ids, f"logic_occurrence.entity_id not found in registry entries: {entity_id!r}") elif entity_type == "EXIT": _ensure(entity_id in exit_ids, f"logic_occurrence.entity_id not found in registry exits: {entity_id!r}") _ensure(isinstance(item["logic_label_short"], str) and item["logic_label_short"].strip(), "logic_occurrence.logic_label_short must be non-empty string") _ensure(isinstance(item["logic_label_long"], str) and item["logic_label_long"].strip(), "logic_occurrence.logic_label_long must be non-empty string") _ensure(item["semantic_role"] in ALLOWED_LOGIC_OCCURRENCE_ROLES, f"logic_occurrence.semantic_role invalid: {item['semantic_role']!r}") _ensure(item["status"] in ALLOWED_LOGIC_OCCURRENCE_STATUS, f"logic_occurrence.status invalid: {item['status']!r}") _ensure(item["occurrence_tf"] in CANONICAL_TIMEFRAMES, f"logic_occurrence.occurrence_tf must be canonical token: {item['occurrence_tf']!r}") _ensure(_is_utc_iso_datetime(item["start_dt"]), f"logic_occurrence.start_dt must be UTC ISO-8601 (...Z): {item['start_dt']!r}") _ensure(_is_utc_iso_datetime(item["cert_dt"]), f"logic_occurrence.cert_dt must be UTC ISO-8601 (...Z): {item['cert_dt']!r}") _ensure(isinstance(item["start_price"], (int, float)), "logic_occurrence.start_price must be numeric") _ensure(isinstance(item["cert_price"], (int, float)), "logic_occurrence.cert_price must be numeric") _ensure(isinstance(item["depends_on_occurrence_ids"], list), "logic_occurrence.depends_on_occurrence_ids must be list") _ensure(isinstance(item["used_by_trade_ids"], list), "logic_occurrence.used_by_trade_ids must be list") _ensure(isinstance(item["meta"], dict), "logic_occurrence.meta must be dict") start_dt = pd.Timestamp(item["start_dt"]).tz_convert("UTC") cert_dt = pd.Timestamp(item["cert_dt"]).tz_convert("UTC") _ensure(start_dt <= cert_dt, "logic_occurrence.start_dt must be <= cert_dt") for dep_id in item["depends_on_occurrence_ids"]: _ensure(isinstance(dep_id, str) and dep_id.strip(), "logic_occurrence.depends_on_occurrence_ids items must be non-empty strings") for trade_id in item["used_by_trade_ids"]: _ensure(isinstance(trade_id, int), "logic_occurrence.used_by_trade_ids items must be int") _ensure(trade_id in trade_ids, f"logic_occurrence.used_by_trade_ids references unknown trade.id: {trade_id}") all_occurrence_ids = {item["occurrence_id"] for item in logic_occurrences["items"]} for item in logic_occurrences["items"]: for dep_id in item["depends_on_occurrence_ids"]: _ensure(dep_id in all_occurrence_ids, f"logic_occurrence depends on unknown occurrence_id: {dep_id!r}") def _timeframe_rank(tf: str) -> int: order = ["Y", "M", "W", "D", "H6", "H4", "H2", "H1", "M30", "M15", "M10", "M5", "M3", "M1", "S1"] token = str(tf or "").upper() _ensure(token in order, f"Unsupported timeframe rank token: {tf!r}") return order.index(token) def _crosscheck_events_vs_registry(events: list, reg: Dict[str, Any]) -> None: pattern_ids = set() condition_ids = set() entry_ids = set() exit_ids = set() for p in reg.get("patterns", []): pattern_ids.add(p["pattern_id"]) for c in p["conditions"]: condition_ids.add(c["condition_id"]) for e in p["entries"]: entry_ids.add(e["entry_id"]) for x in p["exits"]: exit_ids.add(x["exit_id"]) for ev in events: et = ev["event_type"] eid = ev["event_id"] if et == "PATTERN": _ensure(eid in pattern_ids, f"Event id not found in pattern_registry.pattern_id: {eid}") elif et == "CONDITION": _ensure(eid in condition_ids, f"Event id not found in pattern_registry.condition_id: {eid}") elif et == "ENTRY": _ensure(eid in entry_ids, f"Event id not found in pattern_registry.entry_id: {eid}") elif et == "EXIT": _ensure(eid in exit_ids, f"Event id not found in pattern_registry.exit_id: {eid}") def validate_output_with_data( result: Dict[str, Any], df_exec: pd.DataFrame, pattern_contexts: Optional[Dict[str, Dict[str, Any]]] = None, ) -> None: """Strict validation + deep price validation using df_exec and pattern_contexts.""" validate_output_strict(result) _ensure(df_exec is not None and not df_exec.empty, "df_exec must not be empty for deep validation.") d = df_exec.copy() d["__dt_utc__"] = _df_dt_to_utc_series(d) d["close"] = pd.to_numeric(d["close"], errors="coerce") d = d.dropna(subset=["__dt_utc__", "close"]) exec_close = {pd.Timestamp(x): float(c) for x, c in zip(d["__dt_utc__"], d["close"])} exec_bar_dur = _parse_tf_to_offset(result["timeframe"]) p_close: Dict[str, Dict[pd.Timestamp, float]] = {} for _, context in (pattern_contexts or {}).items(): if not isinstance(context, dict): continue dfs_by_tf = context.get("dfs_by_tf") if not isinstance(dfs_by_tf, dict): continue for tf, dfp in dfs_by_tf.items(): tf_token = str(tf) if tf_token in p_close: continue _ensure(dfp is not None and not dfp.empty, f"pattern_contexts dfs_by_tf[{tf_token}] must not be empty.") hp = dfp.copy() hp["__dt_utc__"] = _df_dt_to_utc_series(hp) hp["close"] = pd.to_numeric(hp["close"], errors="coerce") hp = hp.dropna(subset=["__dt_utc__", "close"]) p_close[tf_token] = {pd.Timestamp(x): float(c) for x, c in zip(hp["__dt_utc__"], hp["close"])} for t in result["trades"]: for e in t["events"]: sdt = pd.Timestamp(e["start_dt"]).tz_convert("UTC") sp = float(e["start_price"]) tf = str(e["tf"]) if tf != "EXEC" and tf in p_close: tf_dur = _parse_tf_to_offset(tf) expected_sp = _lookup_close_for_event(e, p_close[tf], sdt, tf_dur, f"start_dt tf={tf}") else: expected_sp = _lookup_close_at_event_time(exec_close, sdt, exec_bar_dur, "start_dt execution") _ensure(_float_close(expected_sp, sp, tol=1e-6), f"start_price mismatch tf={tf} dt={sdt}") cdt = pd.Timestamp(e["cert_dt"]).tz_convert("UTC") cp = float(e["cert_price"]) ctf = str(e["cert_tf"]) if ctf != "EXEC" and ctf in p_close: ctf_dur = _parse_tf_to_offset(ctf) expected_cp = _lookup_close_for_event(e, p_close[ctf], cdt, ctf_dur, f"cert_dt tf={ctf}") else: expected_cp = _lookup_close_at_event_time(exec_close, cdt, exec_bar_dur, "cert_dt execution") _ensure(_float_close(expected_cp, cp, tol=1e-6), f"cert_price mismatch tf={ctf} dt={cdt}")