Files
comunidadhll/backend/app/sqlite_to_postgres_migration.py
devRaGonSa 0da8338ba8 Fix
2026-06-05 16:57:25 +02:00

369 lines
13 KiB
Python

"""Idempotent phase-2 migration from displayed SQLite/files into PostgreSQL."""
from __future__ import annotations
import json
import sqlite3
from collections import defaultdict
from contextlib import closing
from pathlib import Path
from typing import Any
from .config import get_storage_path
from .postgres_display_storage import (
connect_postgres as connect_display_postgres,
initialize_postgres_display_storage,
persist_snapshot_record,
)
from .postgres_rcon_storage import initialize_postgres_rcon_storage
RCON_TABLES = (
"rcon_historical_targets",
"rcon_historical_capture_runs",
"rcon_historical_samples",
"rcon_historical_checkpoints",
"rcon_historical_competitive_windows",
"rcon_admin_log_events",
"rcon_player_profile_snapshots",
"rcon_materialized_matches",
"rcon_match_player_stats",
"rcon_scoreboard_match_candidates",
)
DISPLAY_TABLES = (
"game_sources",
"servers",
"server_snapshots",
"historical_servers",
"historical_maps",
"historical_matches",
"historical_players",
"historical_player_match_stats",
"player_event_raw_ledger",
)
SKIP_SLUG = "comunidad-hispana-03"
def migrate_sqlite_to_postgres() -> dict[str, object]:
"""Copy displayed legacy data to PostgreSQL without deleting legacy sources."""
initialize_postgres_rcon_storage()
initialize_postgres_display_storage()
summary: dict[str, object] = {
"status": "ok",
"source_paths": [],
"migrated_tables": [],
"migrated_domains": [],
"rows_read": {},
"rows_inserted": {},
"rows_updated": {},
"rows_skipped": {},
"errors": [],
}
table_totals: dict[str, dict[str, int]] = defaultdict(
lambda: {"read": 0, "inserted": 0, "updated": 0, "skipped": 0}
)
for db_path in _discover_sqlite_paths():
summary["source_paths"].append(str(db_path))
try:
_migrate_sqlite_path(db_path, table_totals)
except Exception as error: # noqa: BLE001 - report all source failures
summary["errors"].append({"source_path": str(db_path), "error": str(error)})
snapshots_root = get_storage_path().parent / "snapshots"
if snapshots_root.exists():
summary["source_paths"].append(str(snapshots_root))
_migrate_snapshot_files(snapshots_root, table_totals, summary["errors"])
_sync_sequences()
summary["migrated_tables"] = sorted(table_totals)
summary["migrated_domains"] = [
"rcon-admin-log-events",
"rcon-player-profile-snapshots",
"rcon-historical-capture-samples-and-windows",
"rcon-materialized-matches",
"rcon-materialized-player-stats",
"rcon-safe-scoreboard-candidates",
"public-scoreboard-historical-matches-and-player-stats",
"weekly-and-monthly-scoreboard-rankings",
"displayed-historical-snapshots",
"live-server-summary-cache",
"player-event-ledger",
]
for table_name, totals in sorted(table_totals.items()):
summary["rows_read"][table_name] = totals["read"]
summary["rows_inserted"][table_name] = totals["inserted"]
summary["rows_updated"][table_name] = totals["updated"]
summary["rows_skipped"][table_name] = totals["skipped"]
summary["status"] = "ok" if not summary["errors"] else "completed-with-errors"
return summary
def _migrate_sqlite_path(db_path: Path, totals: dict[str, dict[str, int]]) -> None:
with closing(sqlite3.connect(db_path)) as sqlite_connection:
sqlite_connection.row_factory = sqlite3.Row
available_tables = {
row["name"]
for row in sqlite_connection.execute(
"SELECT name FROM sqlite_master WHERE type = 'table'"
).fetchall()
}
tables = [table for table in (*RCON_TABLES, *DISPLAY_TABLES) if table in available_tables]
with connect_display_postgres() as postgres_connection:
postgres_columns = {
table: _postgres_columns(postgres_connection, table)
for table in tables
}
historical_server_ids = _legacy_server03_ids(sqlite_connection)
historical_match_ids = _legacy_match_ids(sqlite_connection, historical_server_ids)
legacy_rcon_target_ids = _legacy_rcon_target03_ids(sqlite_connection)
for table_name in tables:
_copy_table(
sqlite_connection,
postgres_connection,
table_name=table_name,
postgres_columns=postgres_columns[table_name],
totals=totals[table_name],
historical_server_ids=historical_server_ids,
historical_match_ids=historical_match_ids,
legacy_rcon_target_ids=legacy_rcon_target_ids,
)
def _copy_table(
sqlite_connection: sqlite3.Connection,
postgres_connection: Any,
*,
table_name: str,
postgres_columns: list[str],
totals: dict[str, int],
historical_server_ids: set[int],
historical_match_ids: set[int],
legacy_rcon_target_ids: set[int],
) -> None:
sqlite_columns = [
str(row["name"])
for row in sqlite_connection.execute(f"PRAGMA table_info({table_name})").fetchall()
]
columns = [column for column in sqlite_columns if column in postgres_columns]
if not columns:
return
rows = sqlite_connection.execute(
f"SELECT {', '.join(columns)} FROM {table_name}"
).fetchall()
placeholders = ", ".join(["%s"] * len(columns))
sql = (
f"INSERT INTO {table_name} ({', '.join(columns)}) "
f"VALUES ({placeholders}) ON CONFLICT DO NOTHING"
)
values: list[tuple[object, ...]] = []
for row in rows:
totals["read"] += 1
row_dict = dict(row)
if _skip_row(
table_name,
row_dict,
historical_server_ids=historical_server_ids,
historical_match_ids=historical_match_ids,
legacy_rcon_target_ids=legacy_rcon_target_ids,
):
totals["skipped"] += 1
continue
values.append(tuple(_postgres_value(column, row_dict[column]) for column in columns))
with postgres_connection.cursor() as cursor:
for start in range(0, len(values), 1000):
batch = values[start : start + 1000]
cursor.executemany(sql, batch)
inserted = max(0, int(cursor.rowcount or 0))
totals["inserted"] += inserted
totals["skipped"] += len(batch) - inserted
def _migrate_snapshot_files(
snapshots_root: Path,
totals: dict[str, dict[str, int]],
errors: list[object],
) -> None:
snapshot_totals = totals["displayed_historical_snapshots"]
for snapshot_path in sorted(snapshots_root.glob("*/*.json")):
snapshot_totals["read"] += 1
try:
document = json.loads(snapshot_path.read_text(encoding="utf-8"))
if str(document.get("server_key") or "") == SKIP_SLUG:
snapshot_totals["skipped"] += 1
continue
before = _snapshot_exists(document)
persist_snapshot_record(document)
snapshot_totals["updated" if before else "inserted"] += 1
except Exception as error: # noqa: BLE001 - keep migrating neighboring snapshots
snapshot_totals["skipped"] += 1
errors.append({"source_path": str(snapshot_path), "error": str(error)})
def _snapshot_exists(document: dict[str, object]) -> bool:
with connect_display_postgres() as connection:
row = connection.execute(
"""
SELECT 1 FROM displayed_historical_snapshots
WHERE server_key = %s AND snapshot_type = %s AND metric = %s AND snapshot_window = %s
""",
(
str(document.get("server_key") or ""),
str(document.get("snapshot_type") or ""),
str(document.get("metric") or ""),
str(document.get("window") or ""),
),
).fetchone()
return bool(row)
def _skip_row(
table_name: str,
row: dict[str, object],
*,
historical_server_ids: set[int],
historical_match_ids: set[int],
legacy_rcon_target_ids: set[int],
) -> bool:
if row.get("server_slug") == SKIP_SLUG or row.get("slug") == SKIP_SLUG:
return True
if row.get("external_server_id") == SKIP_SLUG or row.get("target_key") == SKIP_SLUG:
return True
if table_name == "historical_matches" and row.get("historical_server_id") in historical_server_ids:
return True
if (
table_name == "historical_player_match_stats"
and row.get("historical_match_id") in historical_match_ids
):
return True
if table_name == "rcon_historical_samples" and row.get("target_id") in legacy_rcon_target_ids:
return True
if table_name == "rcon_historical_checkpoints" and row.get("target_id") in legacy_rcon_target_ids:
return True
if table_name == "rcon_historical_competitive_windows" and row.get("target_id") in legacy_rcon_target_ids:
return True
return False
def _legacy_server03_ids(connection: sqlite3.Connection) -> set[int]:
if not _has_table(connection, "historical_servers"):
return set()
return {
int(row["id"])
for row in connection.execute(
"SELECT id FROM historical_servers WHERE slug = ?",
(SKIP_SLUG,),
).fetchall()
}
def _legacy_rcon_target03_ids(connection: sqlite3.Connection) -> set[int]:
if not _has_table(connection, "rcon_historical_targets"):
return set()
return {
int(row["id"])
for row in connection.execute(
"""
SELECT id FROM rcon_historical_targets
WHERE external_server_id = ? OR target_key = ?
""",
(SKIP_SLUG, SKIP_SLUG),
).fetchall()
}
def _legacy_match_ids(connection: sqlite3.Connection, historical_server_ids: set[int]) -> set[int]:
if not historical_server_ids or not _has_table(connection, "historical_matches"):
return set()
placeholders = ", ".join(["?"] * len(historical_server_ids))
return {
int(row["id"])
for row in connection.execute(
f"SELECT id FROM historical_matches WHERE historical_server_id IN ({placeholders})",
tuple(sorted(historical_server_ids)),
).fetchall()
}
def _postgres_columns(connection: Any, table_name: str) -> list[str]:
rows = connection.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = %s
ORDER BY ordinal_position
""",
(table_name,),
).fetchall()
return [str(row["column_name"]) for row in rows]
def _sync_sequences() -> None:
tables = (
"game_sources",
"servers",
"server_snapshots",
"historical_servers",
"historical_maps",
"historical_matches",
"historical_players",
"historical_player_match_stats",
"player_event_raw_ledger",
"rcon_historical_targets",
"rcon_historical_capture_runs",
"rcon_historical_samples",
"rcon_historical_competitive_windows",
"rcon_admin_log_events",
"rcon_player_profile_snapshots",
"rcon_materialized_matches",
"rcon_match_player_stats",
"rcon_scoreboard_match_candidates",
)
with connect_display_postgres() as connection:
for table_name in tables:
connection.execute(
f"""
SELECT setval(
pg_get_serial_sequence(%s, 'id'),
GREATEST(COALESCE((SELECT MAX(id) FROM {table_name}), 1), 1),
TRUE
)
""",
(table_name,),
)
def _discover_sqlite_paths() -> list[Path]:
configured = get_storage_path()
candidates = {configured}
if configured.parent.exists():
candidates.update(configured.parent.glob("*.sqlite*"))
return sorted(
path
for path in candidates
if path.exists()
and path.is_file()
and not str(path).endswith(("-shm", "-wal"))
)
def _has_table(connection: sqlite3.Connection, table_name: str) -> bool:
return bool(
connection.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?",
(table_name,),
).fetchone()
)
def _postgres_value(column: str, value: object) -> object:
if column in {"is_active", "is_teamkill"}:
return bool(value)
return value
def main() -> None:
print(json.dumps(migrate_sqlite_to_postgres(), ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()