from __future__ import annotations import os from datetime import date from pathlib import Path from typing import Dict, Iterable, Optional import psycopg2 import psycopg2.extras import requests from dotenv import load_dotenv from uv_app.core.mssql import connect_to_mssql from uv_app.core.pgsql import connect_to_pgsql BASE_URL = "https://api.manorivile.lt/client/v2" TIMEOUT = 30 SERVICE_CODE = "KAINA" MIN_YEAR = 2000 OPEN_END_DATE = date(3999, 1, 1) DOTENV_PATH = Path(__file__).resolve().parents[1] / ".env" load_dotenv(DOTENV_PATH, override=True) QUERY_PATH = Path(__file__).with_name("menesine_kaina_by_sutarties_kodas.sql") def _post(api_key: str, payload: dict) -> dict: headers = { "ApiKey": api_key, "Content-Type": "application/json", "Accept": "application/json", } response = requests.post( BASE_URL, json=payload, headers=headers, timeout=TIMEOUT, ) if response.status_code != 200: raise RuntimeError(f"Rivile HTTP {response.status_code}: {response.text}") data = response.json() if "errorMessage" in data: raise RuntimeError(f"Rivile API error: {data}") return data def _get_api_key() -> str: api_key = os.getenv("RIVILE_API_KEY", "").strip() if not api_key: raise RuntimeError("Missing RIVILE_API_KEY environment variable.") return api_key def _read_query() -> str: return QUERY_PATH.read_text(encoding="utf-8") def _fetch_rows(sutarties_kodas: str) -> Iterable[Dict[str, object]]: conn = connect_to_pgsql() if conn is None: raise RuntimeError("Failed to connect to PostgreSQL.") try: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor: cursor.execute(_read_query(), (sutarties_kodas,)) return cursor.fetchall() finally: conn.close() def _contract_exists(conn: "pyodbc.Connection", contract_code: str) -> bool: cursor = conn.cursor() cursor.execute( """ SELECT 1 FROM dbo.N51_SUTH WHERE N51_KODAS_KT = ? """, (contract_code,), ) return cursor.fetchone() is not None def _fetch_existing_appendices( conn: "pyodbc.Connection", contract_code: str, ) -> list[dict[str, object]]: cursor = conn.cursor() cursor.execute( """ SELECT N52_KODAS_KT, N52_KODAS_K0, N52_DOK_NR, N52_KODAS, CONVERT(date, N52_BEG_DATE) AS N52_BEG_DATE, CONVERT(date, N52_END_DATE) AS N52_END_DATE, N52_KAINA, N52_PASTABOS FROM dbo.N52_SUTD WHERE N52_KODAS_KT = ? AND N52_KODAS = ? """, (contract_code, SERVICE_CODE), ) rows = cursor.fetchall() columns = [c[0] for c in cursor.description] return [dict(zip(columns, row)) for row in rows] def _choose_contract_type(conn: "pyodbc.Connection") -> Optional[str]: cursor = conn.cursor() cursor.execute( """ SELECT N51_TIPAS, COUNT(*) AS cnt FROM dbo.N51_SUTH WHERE N51_TIPAS IS NOT NULL GROUP BY N51_TIPAS ORDER BY cnt DESC, N51_TIPAS """ ) row = cursor.fetchone() if not row: return None return str(row[0]).strip() def _create_contract( api_key: str, contract_code: str, client_code: str, start: date, contract_type: str, ) -> None: if start < date(MIN_YEAR, 1, 1): start = date.today() payload = { "method": "EDIT_N51_FULL", "params": { "oper": "I", "user": api_key.split(".", 1)[0], }, "data": { "N51": { "N51_KODAS_KT": contract_code, "N51_KODAS_KS": client_code, "N51_PAV": contract_code, "N51_OP_DATA": start.isoformat(), "N51_BEG_DATE": start.isoformat(), "N51_TIPAS": contract_type, } }, } _post(api_key, payload) def _update_contract( api_key: str, contract_code: str, client_code: str, contract_type: str, ) -> None: payload = { "method": "EDIT_N51", "params": { "oper": "U", "user": api_key.split(".", 1)[0], "fld": "N51_KODAS_KT", "val": contract_code, }, "data": { "N51": { "N51_KODAS_KT": contract_code, "N51_KODAS_KS": client_code, "N51_PAV": contract_code, "N51_TIPAS": contract_type, } }, } _post(api_key, payload) def _create_appendix( api_key: str, contract_code: str, kodas_k0: str, dok_nr: str, start_date: date, end_date: date, price: float, comment: Optional[str], ) -> None: n52 = { "N52_KODAS_KT": contract_code, "N52_KODAS_K0": kodas_k0, "N52_DOK_NR": dok_nr, "N52_RUSIS": "2", "N52_KODAS": SERVICE_CODE, "N52_KIEKIS": "1", "N52_KAINA": f"{price:.2f}", "N52_OP_DATA": date.today().isoformat(), "N52_BEG_DATE": start_date.isoformat(), "N52_END_DATE": end_date.isoformat(), "N52_POZ_TERM": "1", "N52_POZ_TERM_KS": "1", "N52_TERM1": "1", "N52_TERM2": "0", "N52_TERM3": "0", "N52_TERM4": "0", "N52_TERM5": "0", "N52_TERM6": "0", "N52_POZ_DATE": "1", "N52_POZ_KS": "0", } if comment: n52["N52_PASTABOS"] = comment payload = { "method": "EDIT_N52", "params": { "oper": "I", "user": api_key.split(".", 1)[0], }, "data": {"N52": n52}, } _post(api_key, payload) def _update_appendix( api_key: str, contract_code: str, kodas_k0: str, service_code: str, dok_nr: str, start_date: date, end_date: date, price: float, comment: Optional[str], ) -> None: n52 = { "N52_KODAS_KT": contract_code, "N52_KODAS_K0": kodas_k0, "N52_DOK_NR": dok_nr, "N52_RUSIS": "2", "N52_KODAS": service_code, "N52_KIEKIS": "1", "N52_KAINA": f"{price:.2f}", "N52_OP_DATA": date.today().isoformat(), "N52_BEG_DATE": start_date.isoformat(), "N52_END_DATE": end_date.isoformat(), "N52_POZ_TERM": "1", "N52_POZ_TERM_KS": "1", "N52_TERM1": "1", "N52_TERM2": "0", "N52_TERM3": "0", "N52_TERM4": "0", "N52_TERM5": "0", "N52_TERM6": "0", "N52_POZ_DATE": "1", "N52_POZ_KS": "0", } if comment: n52["N52_PASTABOS"] = comment payload = { "method": "EDIT_N52", "params": { "oper": "U", "user": api_key.split(".", 1)[0], "fld": "N52_KODAS_KT,N52_KODAS_K0,N52_KODAS,N52_DOK_NR", "val": f"{contract_code},{kodas_k0},{service_code},{dok_nr}", }, "data": {"N52": n52}, } _post(api_key, payload) def _normalize_decimal(value: object) -> str: if value is None: return "" return f"{float(value):.2f}" def _is_desired_state( existing: list[dict[str, object]], periods: list[dict[str, object]], ) -> bool: if len(existing) != len(periods): return False existing_by_k0 = {str(r.get("N52_KODAS_K0") or "").strip(): r for r in existing} for idx, period in enumerate(periods, start=1): expected_k0 = f"{SERVICE_CODE}-{idx}" row = existing_by_k0.get(expected_k0) if not row: return False if str(row.get("N52_DOK_NR") or "").strip() != str(idx): return False if row.get("N52_BEG_DATE") != period["start"]: return False if row.get("N52_END_DATE") != period["end"]: return False if _normalize_decimal(row.get("N52_KAINA")) != _normalize_decimal(period["price"]): return False return True def _delete_appendix( api_key: str, contract_code: str, kodas_k0: str, service_code: str, dok_nr: str, ) -> None: payload = { "method": "EDIT_N52", "params": { "oper": "D", "user": api_key.split(".", 1)[0], "fld": "N52_KODAS_KT,N52_KODAS_K0,N52_KODAS,N52_DOK_NR", "val": f"{contract_code},{kodas_k0},{service_code},{dok_nr}", }, "data": { "N52": { "N52_KODAS_KT": contract_code, "N52_KODAS_K0": kodas_k0, "N52_KODAS": service_code, "N52_DOK_NR": dok_nr, } }, } _post(api_key, payload) def _normalize_year_month(year: int, month: int) -> tuple[int, int]: norm_year = year if year >= MIN_YEAR else MIN_YEAR norm_month = 1 if month < 1 or month > 12 else month return norm_year, norm_month def _build_periods(rows: list[dict[str, object]]) -> list[dict[str, object]]: normalized = [] for row in rows: year = int(row["metai"]) month = int(row["menuo"]) norm_year, norm_month = _normalize_year_month(year, month) normalized.append( { "start": date(norm_year, norm_month, 1), "price": float(row["kaina"]), "comment": row.get("comments"), } ) normalized.sort(key=lambda item: item["start"]) periods = [] for idx, item in enumerate(normalized): start_date = item["start"] if idx == 0: end_date = OPEN_END_DATE elif idx == len(normalized) - 1: end_date = OPEN_END_DATE else: next_start = normalized[idx + 1]["start"] end_date = next_start - date.resolution periods.append( { "start": start_date, "end": end_date, "price": item["price"], "comment": item["comment"], } ) return periods def main() -> None: sutarties_kodas = input("Sutarties kodas: ").strip() if not sutarties_kodas: print("Missing sutarties kodas.") return rows = list(_fetch_rows(sutarties_kodas)) if not rows: print("No menesine_kaina rows found.") return contract_code = f"SUT-{sutarties_kodas}" client_code = str(rows[0].get("kliento_kodas") or "").strip() if not client_code: raise RuntimeError("Missing client code for sutartis.") api_key = _get_api_key() mssql_conn = connect_to_mssql() if mssql_conn is None: raise RuntimeError("Failed to connect to MSSQL.") try: contract_type = _choose_contract_type(mssql_conn) if not contract_type: raise RuntimeError("Failed to resolve N51_TIPAS for Pardavimo.") periods = _build_periods(rows) if not periods: print("No valid periods found.") return if not _contract_exists(mssql_conn, contract_code): first = periods[0]["start"] _create_contract(api_key, contract_code, client_code, first, contract_type) print(f"Created contract {contract_code}") else: _update_contract(api_key, contract_code, client_code, contract_type) print(f"Updated contract {contract_code}") existing = _fetch_existing_appendices(mssql_conn, contract_code) desired_by_period = {(p["start"], p["end"]) for p in periods} max_desired_start = max(p["start"] for p in periods) newer_existing = [ row for row in existing if row.get("N52_BEG_DATE") and row["N52_BEG_DATE"] > max_desired_start ] if newer_existing: print( "Found newer MSSQL periods not in PGSQL; skipping changes." ) for row in newer_existing: print( f" Existing period: {row.get('N52_BEG_DATE')} - {row.get('N52_END_DATE')}" ) return if _is_desired_state(existing, periods): print("Already in desired state; no changes.") return for row in existing: dok_nr = str(row.get("N52_DOK_NR") or "").strip() kodas_k0 = str(row.get("N52_KODAS_K0") or "").strip() if dok_nr and kodas_k0: _delete_appendix( api_key, contract_code, kodas_k0, SERVICE_CODE, dok_nr, ) print(f"Deleted appendix {kodas_k0} ({dok_nr})") next_number = 1 for period in periods: start_date = period["start"] end_date = period["end"] price = period["price"] comment = period["comment"] dok_nr = str(next_number) kodas_k0 = f"{SERVICE_CODE}-{dok_nr}" _create_appendix( api_key, contract_code, kodas_k0, dok_nr, start_date, end_date, price, comment, ) print(f"Created appendix for {start_date} - {end_date}") next_number += 1 finally: mssql_conn.close() if __name__ == "__main__": main()