from __future__ import annotations import os from datetime import date, datetime from pathlib import Path from typing import Dict, Iterable, Optional, Tuple import psycopg2 import psycopg2.extras import requests from dotenv import load_dotenv from uv_app.core.mssql import connect_to_mssql from uv_app.core.pgsql import connect_to_pgsql BASE_URL = "https://api.manorivile.lt/client/v2" TIMEOUT = 30 QUERY_PATH = Path(__file__).with_name("routeriai_query.sql") DOTENV_PATH = Path(__file__).resolve().parents[2] / ".env" # Force reload environment variables from repo .env, ignoring system vars. load_dotenv(DOTENV_PATH, override=True) def _post(api_key: str, payload: dict) -> tuple[dict, int]: headers = { "ApiKey": api_key, "Content-Type": "application/json", "Accept": "application/json", } response = requests.post( BASE_URL, json=payload, headers=headers, timeout=TIMEOUT, ) status_code = response.status_code if status_code != 200: raise RuntimeError(f"Rivile HTTP {status_code}: {response.text}") data = response.json() if "errorMessage" in data: raise RuntimeError(f"Rivile API error: {data}") return data, status_code def _get_api_key() -> str: api_key = os.getenv("RIVILE_API_KEY", "").strip() if not api_key: raise RuntimeError("Missing RIVILE_API_KEY environment variable.") return api_key def _get_user_from_key(api_key: str) -> str: return api_key.split(".", 1)[0] def _fetch_mssql_client( conn: "pyodbc.Connection", client_code: str, ) -> Optional[Dict[str, object]]: query = """ SELECT N08_KODAS_KS, N08_PAV, N08_ADR, N08_E_MAIL, N08_ADD_DATE, N08_MOB_TEL, N08_IM_KODAS, N08_PVM_KODAS, N08_ADDUSR, N08_TIPAS, N08_KODAS_DS, N08_BUSENA, N08_PVM_SKOL_T, N08_PVM_SKOL_P, N08_SKOL_SUD, N08_WEB_POZT, N08_KODAS_XS_T, N08_KODAS_XS_P, N08_RUSIS, N08_R_DATE FROM dbo.N08_KLIJ WHERE N08_KODAS_KS = ? """ cursor = conn.cursor() cursor.execute(query, (client_code,)) row = cursor.fetchone() if not row: return None columns = [c[0] for c in cursor.description] return dict(zip(columns, row)) def _normalize_person_type(value: Optional[object]) -> str: if value is None: return "1" cleaned = str(value).strip() if cleaned == "1": return "1" if cleaned == "0": return "2" return "1" def _build_n08_payload(row: Dict[str, object]) -> Dict[str, object]: mobile = (row.get("mobile_phone") or "").strip() phone = (row.get("phone") or "").strip() contact_phone = mobile or phone company_code = row.get("company_code") if company_code is None or str(company_code).strip() == "": company_code = "ND" else: company_code = str(company_code).strip() vat_code = row.get("vat_code") if vat_code is None or str(vat_code).strip() == "": vat_code = "ND" else: vat_code = str(vat_code).strip() creation_date = row.get("creation_date") if isinstance(creation_date, (date, datetime)): creation_date = creation_date.isoformat() return { "N08_KODAS_KS": row.get("client_code"), "N08_PAV": row.get("name"), "N08_ADR": row.get("address"), "N08_E_MAIL": row.get("email"), "N08_ADD_DATE": creation_date, "N08_MOB_TEL": contact_phone, "N08_IM_KODAS": company_code, "N08_PVM_KODAS": vat_code, "N08_ADDUSR": row.get("ucreated"), "N08_TIPAS": _normalize_person_type(row.get("person_type")), "N08_KODAS_DS": "PT001", "N08_BUSENA": "1", "N08_PVM_SKOL_T": "1", "N08_PVM_SKOL_P": "1", "N08_SKOL_SUD": "3", "N08_WEB_POZT": "1", "N08_KODAS_XS_T": "NEPVM", "N08_KODAS_XS_P": "NEPVM", "N08_RUSIS": "1", "N08_R_DATE": datetime.now().isoformat(timespec="seconds"), } def _normalize_value(value: object) -> str: if value is None: return "" if isinstance(value, (date, datetime)): return value.isoformat().strip() if isinstance(value, bytes): return value.decode(errors="ignore").strip() return str(value).strip() def _diff_fields( existing: Optional[Dict[str, object]], desired: Dict[str, object], ) -> Dict[str, Tuple[str, str]]: if not existing: return {key: ("", _normalize_value(value)) for key, value in desired.items()} changes: Dict[str, Tuple[str, str]] = {} for key, desired_value in desired.items(): existing_value = _normalize_value(existing.get(key)) normalized_desired = _normalize_value(desired_value) if existing_value != normalized_desired: changes[key] = (existing_value, normalized_desired) return changes def _upsert_client( api_key: str, mssql_conn: "pyodbc.Connection", row: Dict[str, object], index: int, total: int, ) -> None: client_code = str(row.get("client_code") or "").strip() if not client_code: raise RuntimeError("Missing client_code in source row.") n08 = _build_n08_payload(row) user = _get_user_from_key(api_key) existing = _fetch_mssql_client(mssql_conn, client_code) if existing: existing_web = _normalize_value(existing.get("N08_WEB_POZT")) if existing_web: n08["N08_WEB_POZT"] = existing_web changes = _diff_fields(existing, n08) if existing: if not changes or set(changes.keys()) == {"N08_R_DATE"}: print(f"No changes for client: {client_code} ({index}/{total})") return print(f"Updating client: {client_code} ({index}/{total})") for field, (old, new) in changes.items(): print(f" {field}: '{old}' -> '{new}'") payload = { "method": "EDIT_N08", "params": { "oper": "U", "user": user, "fld": "N08_KODAS_KS", "val": client_code, }, "data": {"N08": n08}, } else: print(f"Creating client: {client_code} ({index}/{total})") for field, (_, new) in changes.items(): print(f" {field}: '' -> '{new}'") payload = { "method": "EDIT_N08_FULL", "params": {"oper": "I", "user": user}, "data": {"N08": n08}, } _, status_code = _post(api_key, payload) print(f" Rivile response status: {status_code}") def _read_query() -> str: return QUERY_PATH.read_text(encoding="utf-8") def _fetch_clients() -> Iterable[Dict[str, object]]: conn = connect_to_pgsql() if conn is None: raise RuntimeError("Failed to connect to PostgreSQL.") try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: cursor.execute(_read_query()) return cursor.fetchall() finally: conn.close() def main() -> None: api_key = _get_api_key() rows = list(_fetch_clients()) mssql_conn = connect_to_mssql() if mssql_conn is None: raise RuntimeError("Failed to connect to MSSQL.") try: total = len(rows) for index, row in enumerate(rows, start=1): _upsert_client(api_key, mssql_conn, row, index, total) finally: mssql_conn.close() if __name__ == "__main__": main()