Files
migrations/uv_app/user/insert_clients.py
2026-01-30 15:45:43 +02:00

242 lines
6.7 KiB
Python

from __future__ import annotations
import os
from datetime import date, datetime
from pathlib import Path
from typing import Dict, Iterable, Optional, Tuple
import psycopg2
import psycopg2.extras
import requests
from dotenv import find_dotenv, load_dotenv
from uv_app.core.mssql import connect_to_mssql
from uv_app.core.pgsql import connect_to_pgsql
BASE_URL = "https://api.manorivile.lt/client/v2"
TIMEOUT = 30
QUERY_PATH = Path(__file__).with_name("routeriai_query.sql")
# Force reload environment variables from repo .env, ignoring system vars.
load_dotenv(find_dotenv(), override=True)
def _post(api_key: str, payload: dict) -> dict:
headers = {
"ApiKey": api_key,
"Content-Type": "application/json",
"Accept": "application/json",
}
response = requests.post(
BASE_URL,
json=payload,
headers=headers,
timeout=TIMEOUT,
)
if response.status_code != 200:
raise RuntimeError(f"Rivile HTTP {response.status_code}: {response.text}")
data = response.json()
if "errorMessage" in data:
raise RuntimeError(f"Rivile API error: {data}")
return data
def _get_api_key() -> str:
api_key = os.getenv("RIVILE_API_KEY", "").strip()
if not api_key:
raise RuntimeError("Missing RIVILE_API_KEY environment variable.")
return api_key
def _get_user_from_key(api_key: str) -> str:
return api_key.split(".", 1)[0]
def _fetch_mssql_client(
conn: "pyodbc.Connection",
client_code: str,
) -> Optional[Dict[str, object]]:
query = """
SELECT
N08_KODAS,
N08_KODAS_KS,
N08_PAV,
N08_ADDR,
N08_E_EMAIL,
N08_ADD_DATE,
N08_MOB_TEL,
N08_IM_KODAS,
N08_PVM_KODAS,
N08_ADDUSR,
N08_TIPAS,
N08_KODAS_DS,
N08_BUSENA,
N08_PVM_SKOL_T,
N08_PVM_SKOL_P,
N08_SKOL_SUD,
N08_WEB_POZT,
N08_KODAS_XS_T,
N08_KODAS_XS_P,
N08_RUSIS,
N08_R_DATE
FROM dbo.N08_KLIJ
WHERE N08_KODAS_KS = ?
"""
cursor = conn.cursor()
cursor.execute(query, (client_code,))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
def _normalize_person_type(value: Optional[str]) -> str:
cleaned = (value or "").strip()
if cleaned == "1":
return "1"
if cleaned == "0":
return "2"
return "1"
def _build_n08_payload(row: Dict[str, object]) -> Dict[str, object]:
mobile = (row.get("mobile_phone") or "").strip()
phone = (row.get("phone") or "").strip()
contact_phone = mobile or phone
creation_date = row.get("creation_date")
if isinstance(creation_date, (date, datetime)):
creation_date = creation_date.isoformat()
return {
"N08_KODAS": row.get("client_code"),
"N08_KODAS_KS": row.get("client_code"),
"N08_PAV": row.get("name") or "",
"N08_ADDR": row.get("address") or "",
"N08_E_EMAIL": row.get("email") or "",
"N08_ADD_DATE": creation_date or "",
"N08_MOB_TEL": contact_phone,
"N08_IM_KODAS": row.get("company_code") or "",
"N08_PVM_KODAS": row.get("vat_code") or "",
"N08_ADDUSR": row.get("ucreated") or "",
"N08_TIPAS": _normalize_person_type(row.get("person_type")),
"N08_KODAS_DS": "PT001",
"N08_BUSENA": "1",
"N08_PVM_SKOL_T": "1",
"N08_PVM_SKOL_P": "1",
"N08_SKOL_SUD": "3",
"N08_WEB_POZT": "1",
"N08_KODAS_XS_T": "NEPVM",
"N08_KODAS_XS_P": "NEPVM",
"N08_RUSIS": "1",
"N08_R_DATE": date.today().isoformat(),
}
def _normalize_value(value: object) -> str:
if value is None:
return ""
if isinstance(value, (date, datetime)):
return value.isoformat().strip()
if isinstance(value, bytes):
return value.decode(errors="ignore").strip()
return str(value).strip()
def _diff_fields(
existing: Optional[Dict[str, object]],
desired: Dict[str, object],
) -> Dict[str, Tuple[str, str]]:
if not existing:
return {key: ("", _normalize_value(value)) for key, value in desired.items()}
changes: Dict[str, Tuple[str, str]] = {}
for key, desired_value in desired.items():
existing_value = _normalize_value(existing.get(key))
normalized_desired = _normalize_value(desired_value)
if existing_value != normalized_desired:
changes[key] = (existing_value, normalized_desired)
return changes
def _upsert_client(
api_key: str,
mssql_conn: "pyodbc.Connection",
row: Dict[str, object],
) -> None:
client_code = str(row.get("client_code") or "").strip()
if not client_code:
raise RuntimeError("Missing client_code in source row.")
n08 = _build_n08_payload(row)
user = _get_user_from_key(api_key)
existing = _fetch_mssql_client(mssql_conn, client_code)
changes = _diff_fields(existing, n08)
if existing:
if not changes:
print(f"No changes for client: {client_code}")
return
print(f"Updating client: {client_code}")
for field, (old, new) in changes.items():
print(f" {field}: '{old}' -> '{new}'")
payload = {
"method": "EDIT_N08",
"params": {
"oper": "U",
"user": user,
"fld": "N08_KODAS_KS",
"val": client_code,
},
"data": {"N08": n08},
}
else:
print(f"Creating client: {client_code}")
for field, (_, new) in changes.items():
print(f" {field}: '' -> '{new}'")
payload = {
"method": "EDIT_N08_FULL",
"params": {"oper": "I", "user": user},
"data": {"N08": n08},
}
_post(api_key, payload)
def _read_query() -> str:
return QUERY_PATH.read_text(encoding="utf-8")
def _fetch_clients() -> Iterable[Dict[str, object]]:
conn = connect_to_pgsql()
if conn is None:
raise RuntimeError("Failed to connect to PostgreSQL.")
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute(_read_query())
return cursor.fetchall()
finally:
conn.close()
def main() -> None:
api_key = _get_api_key()
rows = _fetch_clients()
mssql_conn = connect_to_mssql()
if mssql_conn is None:
raise RuntimeError("Failed to connect to MSSQL.")
try:
for row in rows:
_upsert_client(api_key, mssql_conn, row)
finally:
mssql_conn.close()
if __name__ == "__main__":
main()