from __future__ import annotations import re from pathlib import Path from typing import Dict, Optional import psycopg2 import psycopg2.extras from uv_app.core.mssql import connect_to_mssql from uv_app.core.pgsql import connect_to_pgsql QUERY_PATH = Path(__file__).with_name("routeriai_query.sql") CLIENT_CODE = "101460" def _read_query() -> str: return QUERY_PATH.read_text(encoding="utf-8") def _clean_text(value: object) -> Optional[str]: if value is None: return None text = value.decode(errors="ignore") if isinstance(value, bytes) else str(value) text = text.replace("\u00a0", " ") text = re.sub(r"[\x00-\x1f\x7f\u2028\u2029\u0085]", " ", text) return re.sub(r"\s+", " ", text).strip() def _format_diff_value(value: str) -> str: return ( value.replace("\r", "\\r") .replace("\n", "\\n") .replace("\t", "\\t") .replace("\u2028", "\\u2028") .replace("\u2029", "\\u2029") .replace("\u0085", "\\u0085") ) def _dump_string(label: str, value: object) -> None: if value is None: print(f"{label}: None") return text = value.decode(errors="ignore") if isinstance(value, bytes) else str(value) print(f"{label} raw repr: {text!r}") print(f"{label} raw escaped: {_format_diff_value(text)}") cleaned = _clean_text(text) print(f"{label} cleaned repr: {cleaned!r}") print(f"{label} cleaned escaped: {_format_diff_value(cleaned or '')}") codepoints = [f"U+{ord(ch):04X}" for ch in text] print(f"{label} codepoints: {' '.join(codepoints)}") def _fetch_pg_client() -> Optional[Dict[str, object]]: conn = connect_to_pgsql() if conn is None: raise RuntimeError("Failed to connect to PostgreSQL.") try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: cursor.execute(_read_query()) rows = cursor.fetchall() finally: conn.close() for row in rows: if str(row.get("client_code") or "").strip() == CLIENT_CODE: return row return None def _fetch_mssql_client() -> Optional[Dict[str, object]]: conn = connect_to_mssql() if conn is None: raise RuntimeError("Failed to connect to MSSQL.") try: query = """ SELECT N08_KODAS_KS, N08_PAV FROM dbo.N08_KLIJ WHERE N08_KODAS_KS = ? """ cursor = conn.cursor() cursor.execute(query, (CLIENT_CODE,)) row = cursor.fetchone() if not row: return None columns = [c[0] for c in cursor.description] return dict(zip(columns, row)) finally: conn.close() def main() -> None: pg_row = _fetch_pg_client() if not pg_row: print("Client not found in PostgreSQL.") else: print("PostgreSQL data:") _dump_string("PG name", pg_row.get("name")) mssql_row = _fetch_mssql_client() if not mssql_row: print("Client not found in MSSQL.") else: print("MSSQL data:") _dump_string("MSSQL N08_PAV", mssql_row.get("N08_PAV")) if __name__ == "__main__": main()