from __future__ import annotations from pathlib import Path from typing import Dict, Iterable, Optional import psycopg2 import psycopg2.extras from dotenv import load_dotenv from uv_app.core.mssql import connect_to_mssql from uv_app.core.pgsql import connect_to_pgsql MIN_YEAR = 2000 OPEN_END_DATE = "3999-01-01" SERVICE_CODE = "STATUSAS" DOTENV_PATH = Path(__file__).resolve().parents[2] / ".env" load_dotenv(DOTENV_PATH, override=True) QUERY_PATH = Path(__file__).with_name("bukle_by_sutarties_kodas.sql") def _read_query() -> str: return QUERY_PATH.read_text(encoding="utf-8") def _fetch_pg_rows(sutarties_kodas: str) -> Iterable[Dict[str, object]]: conn = connect_to_pgsql() if conn is None: raise RuntimeError("Failed to connect to PostgreSQL.") try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: cursor.execute(_read_query(), (sutarties_kodas,)) return cursor.fetchall() finally: conn.close() def _fetch_mssql_appendices(contract_code: str) -> list[dict[str, object]]: conn = connect_to_mssql() if conn is None: raise RuntimeError("Failed to connect to MSSQL.") try: cursor = conn.cursor() cursor.execute( """ SELECT N52_KODAS_KT, N52_KODAS_K0, N52_DOK_NR, N52_KODAS, CONVERT(date, N52_BEG_DATE) AS N52_BEG_DATE, CONVERT(date, N52_END_DATE) AS N52_END_DATE, N52_VISKAS FROM dbo.N52_SUTD WHERE N52_KODAS_KT = ? AND N52_KODAS = ? """, (contract_code, SERVICE_CODE), ) rows = cursor.fetchall() columns = [c[0] for c in cursor.description] return [dict(zip(columns, row)) for row in rows] finally: conn.close() def _normalize_date(value: object) -> str: if value is None: return "" return str(value) def _build_periods(rows: list[dict[str, object]]) -> list[dict[str, object]]: normalized = [] for row in rows: start = row.get("data") if start is None or str(start) < f"{MIN_YEAR}-01-01": start = f"{MIN_YEAR}-01-01" normalized.append( { "start": start, "bukle": int(row["bukle"]), } ) normalized.sort(key=lambda item: str(item["start"])) periods = [] for idx, item in enumerate(normalized): start_date = str(item["start"]) if idx == len(normalized) - 1: end_date = OPEN_END_DATE else: next_start = normalized[idx + 1]["start"] end_date = str(next_start - date.resolution) active = item["bukle"] in {2, 3} periods.append( { "start": start_date, "end": end_date, "active": active, "k0": f"{SERVICE_CODE}-{idx + 1}", "dok_nr": str(idx + 1), } ) return periods def _explain_mismatch(existing: list[dict[str, object]], desired: list[dict[str, object]]) -> None: print("=== Desired (from PGSQL) ===") for row in desired: print(row) print("=== Existing (from MSSQL) ===") for row in existing: print( { "k0": str(row.get("N52_KODAS_K0") or "").strip(), "dok_nr": str(row.get("N52_DOK_NR") or "").strip(), "start": _normalize_date(row.get("N52_BEG_DATE")), "end": _normalize_date(row.get("N52_END_DATE")), "active": int(row.get("N52_VISKAS") or 0) == 1, } ) existing_keys = { ( _normalize_date(row.get("N52_BEG_DATE")), _normalize_date(row.get("N52_END_DATE")), int(row.get("N52_VISKAS") or 0) == 1, str(row.get("N52_KODAS_K0") or "").strip(), str(row.get("N52_DOK_NR") or "").strip(), ) for row in existing } desired_keys = { ( row["start"], row["end"], row["active"], row["k0"], row["dok_nr"], ) for row in desired } print("=== Missing in MSSQL ===") for key in sorted(desired_keys - existing_keys): print(key) print("=== Extra in MSSQL ===") for key in sorted(existing_keys - desired_keys): print(key) def main() -> None: sutarties_kodas = input("Sutarties kodas: ").strip() if not sutarties_kodas: print("Missing sutarties kodas.") return rows = list(_fetch_pg_rows(sutarties_kodas)) if not rows: print("No bukle rows found in PGSQL.") return contract_code = f"SUT-{sutarties_kodas}" existing = _fetch_mssql_appendices(contract_code) desired = _build_periods(rows) _explain_mismatch(existing, desired) if __name__ == "__main__": main()