171 lines
4.9 KiB
Python
171 lines
4.9 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, Optional
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from dotenv import load_dotenv
|
|
|
|
from uv_app.core.mssql import connect_to_mssql
|
|
from uv_app.core.pgsql import connect_to_pgsql
|
|
|
|
MIN_YEAR = 2000
|
|
OPEN_END_DATE = "3999-01-01"
|
|
SERVICE_CODE = "STATUSAS"
|
|
|
|
DOTENV_PATH = Path(__file__).resolve().parents[2] / ".env"
|
|
load_dotenv(DOTENV_PATH, override=True)
|
|
|
|
QUERY_PATH = Path(__file__).with_name("bukle_by_sutarties_kodas.sql")
|
|
|
|
|
|
def _read_query() -> str:
|
|
return QUERY_PATH.read_text(encoding="utf-8")
|
|
|
|
|
|
def _fetch_pg_rows(sutarties_kodas: str) -> Iterable[Dict[str, object]]:
|
|
conn = connect_to_pgsql()
|
|
if conn is None:
|
|
raise RuntimeError("Failed to connect to PostgreSQL.")
|
|
try:
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
|
|
cursor.execute(_read_query(), (sutarties_kodas,))
|
|
return cursor.fetchall()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _fetch_mssql_appendices(contract_code: str) -> list[dict[str, object]]:
|
|
conn = connect_to_mssql()
|
|
if conn is None:
|
|
raise RuntimeError("Failed to connect to MSSQL.")
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT
|
|
N52_KODAS_KT,
|
|
N52_KODAS_K0,
|
|
N52_DOK_NR,
|
|
N52_KODAS,
|
|
CONVERT(date, N52_BEG_DATE) AS N52_BEG_DATE,
|
|
CONVERT(date, N52_END_DATE) AS N52_END_DATE,
|
|
N52_VISKAS
|
|
FROM dbo.N52_SUTD
|
|
WHERE N52_KODAS_KT = ?
|
|
AND N52_KODAS = ?
|
|
""",
|
|
(contract_code, SERVICE_CODE),
|
|
)
|
|
rows = cursor.fetchall()
|
|
columns = [c[0] for c in cursor.description]
|
|
return [dict(zip(columns, row)) for row in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _normalize_date(value: object) -> str:
|
|
if value is None:
|
|
return ""
|
|
return str(value)
|
|
|
|
|
|
def _build_periods(rows: list[dict[str, object]]) -> list[dict[str, object]]:
|
|
normalized = []
|
|
for row in rows:
|
|
start = row.get("data")
|
|
if start is None or str(start) < f"{MIN_YEAR}-01-01":
|
|
start = f"{MIN_YEAR}-01-01"
|
|
normalized.append(
|
|
{
|
|
"start": start,
|
|
"bukle": int(row["bukle"]),
|
|
}
|
|
)
|
|
normalized.sort(key=lambda item: str(item["start"]))
|
|
periods = []
|
|
for idx, item in enumerate(normalized):
|
|
start_date = str(item["start"])
|
|
if idx == len(normalized) - 1:
|
|
end_date = OPEN_END_DATE
|
|
else:
|
|
next_start = normalized[idx + 1]["start"]
|
|
end_date = str(next_start - date.resolution)
|
|
active = item["bukle"] in {2, 3}
|
|
periods.append(
|
|
{
|
|
"start": start_date,
|
|
"end": end_date,
|
|
"active": active,
|
|
"k0": f"{SERVICE_CODE}-{idx + 1}",
|
|
"dok_nr": str(idx + 1),
|
|
}
|
|
)
|
|
return periods
|
|
|
|
|
|
def _explain_mismatch(existing: list[dict[str, object]], desired: list[dict[str, object]]) -> None:
|
|
print("=== Desired (from PGSQL) ===")
|
|
for row in desired:
|
|
print(row)
|
|
print("=== Existing (from MSSQL) ===")
|
|
for row in existing:
|
|
print(
|
|
{
|
|
"k0": str(row.get("N52_KODAS_K0") or "").strip(),
|
|
"dok_nr": str(row.get("N52_DOK_NR") or "").strip(),
|
|
"start": _normalize_date(row.get("N52_BEG_DATE")),
|
|
"end": _normalize_date(row.get("N52_END_DATE")),
|
|
"active": int(row.get("N52_VISKAS") or 0) == 1,
|
|
}
|
|
)
|
|
|
|
existing_keys = {
|
|
(
|
|
_normalize_date(row.get("N52_BEG_DATE")),
|
|
_normalize_date(row.get("N52_END_DATE")),
|
|
int(row.get("N52_VISKAS") or 0) == 1,
|
|
str(row.get("N52_KODAS_K0") or "").strip(),
|
|
str(row.get("N52_DOK_NR") or "").strip(),
|
|
)
|
|
for row in existing
|
|
}
|
|
desired_keys = {
|
|
(
|
|
row["start"],
|
|
row["end"],
|
|
row["active"],
|
|
row["k0"],
|
|
row["dok_nr"],
|
|
)
|
|
for row in desired
|
|
}
|
|
|
|
print("=== Missing in MSSQL ===")
|
|
for key in sorted(desired_keys - existing_keys):
|
|
print(key)
|
|
|
|
print("=== Extra in MSSQL ===")
|
|
for key in sorted(existing_keys - desired_keys):
|
|
print(key)
|
|
|
|
|
|
def main() -> None:
|
|
sutarties_kodas = input("Sutarties kodas: ").strip()
|
|
if not sutarties_kodas:
|
|
print("Missing sutarties kodas.")
|
|
return
|
|
rows = list(_fetch_pg_rows(sutarties_kodas))
|
|
if not rows:
|
|
print("No bukle rows found in PGSQL.")
|
|
return
|
|
contract_code = f"SUT-{sutarties_kodas}"
|
|
existing = _fetch_mssql_appendices(contract_code)
|
|
desired = _build_periods(rows)
|
|
_explain_mismatch(existing, desired)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|