Create infra scripts

This commit is contained in:
2026-02-02 16:38:16 +02:00
parent dd5c88e3bb
commit 68382d62a1
5 changed files with 425 additions and 4 deletions

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import re
from pathlib import Path
from typing import Dict, Optional
import psycopg2
import psycopg2.extras
from uv_app.core.mssql import connect_to_mssql
from uv_app.core.pgsql import connect_to_pgsql
QUERY_PATH = Path(__file__).with_name("routeriai_query.sql")
CLIENT_CODE = "101460"
def _read_query() -> str:
return QUERY_PATH.read_text(encoding="utf-8")
def _clean_text(value: object) -> Optional[str]:
if value is None:
return None
text = value.decode(errors="ignore") if isinstance(value, bytes) else str(value)
text = text.replace("\u00a0", " ")
text = re.sub(r"[\x00-\x1f\x7f\u2028\u2029\u0085]", " ", text)
return re.sub(r"\s+", " ", text).strip()
def _format_diff_value(value: str) -> str:
return (
value.replace("\r", "\\r")
.replace("\n", "\\n")
.replace("\t", "\\t")
.replace("\u2028", "\\u2028")
.replace("\u2029", "\\u2029")
.replace("\u0085", "\\u0085")
)
def _dump_string(label: str, value: object) -> None:
if value is None:
print(f"{label}: None")
return
text = value.decode(errors="ignore") if isinstance(value, bytes) else str(value)
print(f"{label} raw repr: {text!r}")
print(f"{label} raw escaped: {_format_diff_value(text)}")
cleaned = _clean_text(text)
print(f"{label} cleaned repr: {cleaned!r}")
print(f"{label} cleaned escaped: {_format_diff_value(cleaned or '')}")
codepoints = [f"U+{ord(ch):04X}" for ch in text]
print(f"{label} codepoints: {' '.join(codepoints)}")
def _fetch_pg_client() -> Optional[Dict[str, object]]:
conn = connect_to_pgsql()
if conn is None:
raise RuntimeError("Failed to connect to PostgreSQL.")
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute(_read_query())
rows = cursor.fetchall()
finally:
conn.close()
for row in rows:
if str(row.get("client_code") or "").strip() == CLIENT_CODE:
return row
return None
def _fetch_mssql_client() -> Optional[Dict[str, object]]:
conn = connect_to_mssql()
if conn is None:
raise RuntimeError("Failed to connect to MSSQL.")
try:
query = """
SELECT
N08_KODAS_KS,
N08_PAV
FROM dbo.N08_KLIJ
WHERE N08_KODAS_KS = ?
"""
cursor = conn.cursor()
cursor.execute(query, (CLIENT_CODE,))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
finally:
conn.close()
def main() -> None:
pg_row = _fetch_pg_client()
if not pg_row:
print("Client not found in PostgreSQL.")
else:
print("PostgreSQL data:")
_dump_string("PG name", pg_row.get("name"))
mssql_row = _fetch_mssql_client()
if not mssql_row:
print("Client not found in MSSQL.")
else:
print("MSSQL data:")
_dump_string("MSSQL N08_PAV", mssql_row.get("N08_PAV"))
if __name__ == "__main__":
main()