from __future__ import annotations import os import argparse from datetime import date, timedelta from pathlib import Path from typing import Dict, Iterable, Optional import psycopg2 import psycopg2.extras import requests from dotenv import load_dotenv from uv_app.core.mssql import connect_to_mssql from uv_app.core.pgsql import connect_to_pgsql BASE_URL = "https://api.manorivile.lt/client/v2" TIMEOUT = 30 SERVICE_CODE = "STATUSAS" MIN_YEAR = 2000 OPEN_END_DATE = date(3999, 1, 1) DOTENV_PATH = Path(__file__).resolve().parents[1] / ".env" load_dotenv(DOTENV_PATH, override=True) QUERY_PATH = Path(__file__).with_name("bukle_by_sutarties_kodas.sql") def _post(api_key: str, payload: dict) -> dict: headers = { "ApiKey": api_key, "Content-Type": "application/json", "Accept": "application/json", } response = requests.post( BASE_URL, json=payload, headers=headers, timeout=TIMEOUT, ) if response.status_code != 200: raise RuntimeError(f"Rivile HTTP {response.status_code}: {response.text}") data = response.json() if "errorMessage" in data: raise RuntimeError(f"Rivile API error: {data}") return data def _get_api_key() -> str: api_key = os.getenv("RIVILE_API_KEY", "").strip() if not api_key: raise RuntimeError("Missing RIVILE_API_KEY environment variable.") return api_key def _read_query() -> str: return QUERY_PATH.read_text(encoding="utf-8") def _fetch_rows(sutarties_kodas: str) -> Iterable[Dict[str, object]]: conn = connect_to_pgsql() if conn is None: raise RuntimeError("Failed to connect to PostgreSQL.") try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: cursor.execute(_read_query(), (sutarties_kodas,)) return cursor.fetchall() finally: conn.close() def _contract_exists(conn: "pyodbc.Connection", contract_code: str) -> bool: cursor = conn.cursor() cursor.execute( """ SELECT 1 FROM dbo.N51_SUTH WHERE N51_KODAS_KT = ? """, (contract_code,), ) return cursor.fetchone() is not None def _fetch_existing_appendices( conn: "pyodbc.Connection", contract_code: str, ) -> list[dict[str, object]]: cursor = conn.cursor() cursor.execute( """ SELECT N52_KODAS_KT, N52_KODAS_K0, N52_DOK_NR, N52_KODAS, CONVERT(date, N52_BEG_DATE) AS N52_BEG_DATE, CONVERT(date, N52_END_DATE) AS N52_END_DATE, N52_KAINA, N52_PASTABOS, N52_TERM1, N52_TERM2, N52_TERM3, N52_TERM4, N52_TERM5, N52_TERM6, N52_POZ_TERM, N52_POZ_TERM_KS, N52_POZ_DATE, N52_POZ_KS FROM dbo.N52_SUTD WHERE N52_KODAS_KT = ? AND N52_KODAS = ? """, (contract_code, SERVICE_CODE), ) rows = cursor.fetchall() columns = [c[0] for c in cursor.description] return [dict(zip(columns, row)) for row in rows] def _choose_contract_type(conn: "pyodbc.Connection") -> Optional[str]: cursor = conn.cursor() cursor.execute( """ SELECT N51_TIPAS, COUNT(*) AS cnt FROM dbo.N51_SUTH WHERE N51_TIPAS IS NOT NULL GROUP BY N51_TIPAS ORDER BY cnt DESC, N51_TIPAS """ ) row = cursor.fetchone() if not row: return None return str(row[0]).strip() def _create_contract( api_key: str, contract_code: str, client_code: str, start: date, contract_type: str, ) -> None: if start < date(MIN_YEAR, 1, 1): start = date.today() payload = { "method": "EDIT_N51_FULL", "params": { "oper": "I", "user": api_key.split(".", 1)[0], }, "data": { "N51": { "N51_KODAS_KT": contract_code, "N51_KODAS_KS": client_code, "N51_PAV": contract_code, "N51_OP_DATA": start.isoformat(), "N51_BEG_DATE": start.isoformat(), "N51_TIPAS": contract_type, } }, } _post(api_key, payload) def _update_contract( api_key: str, contract_code: str, client_code: str, contract_type: str, ) -> None: payload = { "method": "EDIT_N51", "params": { "oper": "U", "user": api_key.split(".", 1)[0], "fld": "N51_KODAS_KT", "val": contract_code, }, "data": { "N51": { "N51_KODAS_KT": contract_code, "N51_KODAS_KS": client_code, "N51_PAV": contract_code, "N51_TIPAS": contract_type, } }, } _post(api_key, payload) def _delete_appendix( api_key: str, contract_code: str, kodas_k0: str, service_code: str, dok_nr: str, ) -> None: payload = { "method": "EDIT_N52", "params": { "oper": "D", "user": api_key.split(".", 1)[0], "fld": "N52_KODAS_KT,N52_KODAS_K0,N52_KODAS,N52_DOK_NR", "val": f"{contract_code},{kodas_k0},{service_code},{dok_nr}", }, "data": { "N52": { "N52_KODAS_KT": contract_code, "N52_KODAS_K0": kodas_k0, "N52_KODAS": service_code, "N52_DOK_NR": dok_nr, } }, } _post(api_key, payload) def _create_appendix( api_key: str, contract_code: str, kodas_k0: str, dok_nr: str, start_date: date, end_date: date, active: bool, comment: Optional[str], ) -> None: n52 = { "N52_KODAS_KT": contract_code, "N52_KODAS_K0": kodas_k0, "N52_DOK_NR": dok_nr, "N52_RUSIS": "2", "N52_KODAS": SERVICE_CODE, "N52_KIEKIS": "1", "N52_KAINA": "0", "N52_OP_DATA": date.today().isoformat(), "N52_BEG_DATE": start_date.isoformat(), "N52_END_DATE": end_date.isoformat(), "N52_TERM1": "1", "N52_TERM2": "0", "N52_TERM3": "0", "N52_TERM4": "0", "N52_TERM5": "0", "N52_TERM6": "0", "N52_POZ_TERM": "1", "N52_POZ_TERM_KS": "1", "N52_POZ_DATE": "1", "N52_POZ_KS": "0", "N52_VISKAS": "1" if active else "0", } if comment: n52["N52_PASTABOS"] = comment payload = { "method": "EDIT_N52", "params": { "oper": "I", "user": api_key.split(".", 1)[0], }, "data": {"N52": n52}, } _post(api_key, payload) def _is_desired_state( existing: list[dict[str, object]], periods: list[dict[str, object]], ) -> bool: if len(existing) != len(periods): return False existing_keys = [] for row in existing: start = row.get("N52_BEG_DATE") end = row.get("N52_END_DATE") if not start or not end: return False existing_keys.append((start, end)) desired_keys = [(p["start"], p["end"]) for p in periods] existing_keys.sort() desired_keys.sort() return existing_keys == desired_keys def _normalize_date(value: Optional[date]) -> date: if value is None or value < date(MIN_YEAR, 1, 1): return date(MIN_YEAR, 1, 1) return value def _build_periods(rows: list[dict[str, object]]) -> list[dict[str, object]]: normalized = [] for row in rows: start = _normalize_date(row.get("data")) normalized.append( { "start": start, "bukle": int(row["bukle"]), "comment": row.get("comments"), } ) normalized.sort(key=lambda item: item["start"]) periods = [] for idx, item in enumerate(normalized): start_date = item["start"] if idx == len(normalized) - 1: end_date = OPEN_END_DATE else: next_start = normalized[idx + 1]["start"] end_date = next_start - timedelta(days=1) bukle = item["bukle"] active = bukle in {2, 3} periods.append( { "start": start_date, "end": end_date, "active": active, "comment": item["comment"], } ) return periods def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--user-id", dest="user_id") parser.add_argument("--sutartis-id", dest="sutartis_id") return parser.parse_args() def _get_sutarties_kodas(args: argparse.Namespace) -> str: sutarties_kodas = (args.sutartis_id or "").strip() if sutarties_kodas: return sutarties_kodas return input("Sutarties kodas: ").strip() def main() -> None: args = _parse_args() sutarties_kodas = _get_sutarties_kodas(args) if not sutarties_kodas: print("Missing sutarties kodas.") return rows = list(_fetch_rows(sutarties_kodas)) if not rows: print("No bukle rows found.") return contract_code = f"SUT-{sutarties_kodas}" client_code = str(rows[0].get("sutarties_kodas") or "").strip() if not client_code: raise RuntimeError("Missing client code for sutartis.") api_key = _get_api_key() mssql_conn = connect_to_mssql() if mssql_conn is None: raise RuntimeError("Failed to connect to MSSQL.") try: contract_type = _choose_contract_type(mssql_conn) if not contract_type: raise RuntimeError("Failed to resolve N51_TIPAS for Pardavimo.") periods = _build_periods(rows) if not periods: print("No valid periods found.") return if not _contract_exists(mssql_conn, contract_code): first = periods[0]["start"] _create_contract(api_key, contract_code, sutarties_kodas, first, contract_type) print(f"Created contract {contract_code}") else: _update_contract(api_key, contract_code, sutarties_kodas, contract_type) print(f"Updated contract {contract_code}") existing = _fetch_existing_appendices(mssql_conn, contract_code) max_desired_start = max(p["start"] for p in periods) newer_existing = [ row for row in existing if row.get("N52_BEG_DATE") and row["N52_BEG_DATE"] > max_desired_start ] if newer_existing: print("Found newer MSSQL periods not in PGSQL; skipping changes.") for row in newer_existing: print( f" Existing period: {row.get('N52_BEG_DATE')} - {row.get('N52_END_DATE')}" ) return if _is_desired_state(existing, periods): print("Already in desired state; no changes.") return for row in existing: dok_nr = str(row.get("N52_DOK_NR") or "").strip() kodas_k0 = str(row.get("N52_KODAS_K0") or "").strip() if dok_nr and kodas_k0: _delete_appendix( api_key, contract_code, kodas_k0, SERVICE_CODE, dok_nr, ) print(f"Deleted appendix {kodas_k0} ({dok_nr})") next_number = 1 for period in periods: start_date = period["start"] end_date = period["end"] active = period["active"] comment = period["comment"] dok_nr = str(next_number) kodas_k0 = f"{SERVICE_CODE}-{dok_nr}" _create_appendix( api_key, contract_code, kodas_k0, dok_nr, start_date, end_date, active, comment, ) print(f"Created appendix for {start_date} - {end_date}") next_number += 1 finally: mssql_conn.close() if __name__ == "__main__": main()