import os
import uuid
from datetime import datetime
from enum import Enum as PyEnum
from typing import Any, Optional
import psycopg2
import requests
from sqlalchemy import Column, DateTime
from sqlalchemy import Enum as SQLAlchemyEnum
from sqlalchemy import Integer, String, create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from pyside_demo.db.sql import (
SQL_CHECK_FOR_CONFLICTS,
SQL_CREATE_TABLE,
SQL_DELETE_ITEM,
SQL_FETCH_ITEMS,
SQL_UPDATE_OR_INSERT_ITEM,
)
[docs]
class Base(DeclarativeBase):
"""
Base class for declarative SQLAlchemy models.
This class serves as the base for all database models in the application.
It inherits from SQLAlchemy's DeclarativeBase, providing the necessary
functionality for declarative model definitions.
"""
pass
SQLITE_FILE_NAME: str = "local.db"
[docs]
class SyncStatus(str, PyEnum):
SYNCED = "synced"
MODIFIED = "modified"
DELETED = "deleted"
CONFLICT = "conflict"
[docs]
class Item(Base):
__tablename__ = "items"
id: Any = Column(
String, primary_key=True, default=lambda: str(uuid.uuid4())
)
name: Any = Column(String)
description: Any = Column(String)
created_at: Any = Column(DateTime, default=datetime.utcnow)
updated_at: Any = Column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
version: Any = Column(Integer, default=1)
sync_status: Any = Column(
SQLAlchemyEnum(SyncStatus), default=SyncStatus.MODIFIED
)
[docs]
class Database:
def __init__(self):
self.local_engine = create_engine(f"sqlite:///{SQLITE_FILE_NAME}")
Base.metadata.create_all(self.local_engine)
self.Session = sessionmaker(bind=self.local_engine)
[docs]
def add_item(self, name, description):
session = self.Session()
new_item = Item(name=name, description=description)
session.add(new_item)
session.commit()
session.close()
[docs]
def update_item(self, item_id, name, description):
session = self.Session()
item = session.query(Item).filter_by(id=item_id).first()
if item:
item.name = name
item.description = description
item.version += 1
item.sync_status = SyncStatus.MODIFIED
session.commit()
session.close()
[docs]
def set_conflict(self, item_id):
session = self.Session()
item = session.query(Item).filter_by(id=item_id).first()
if item:
item.sync_status = SyncStatus.CONFLICT
session.commit()
session.close()
[docs]
def delete_item(self, item_id):
session = self.Session()
item = session.query(Item).filter_by(id=item_id).first()
if item:
item.sync_status = SyncStatus.DELETED
session.commit()
session.close()
[docs]
def get_items(self):
session = self.Session()
items = (
session.query(Item)
.filter(Item.sync_status != SyncStatus.DELETED)
.all()
)
session.close()
return items
[docs]
def is_online(self):
try:
requests.get("https://www.google.com", timeout=5)
return True
except requests.ConnectionError:
return False
[docs]
def sync_with_postgresql(self):
if not self.is_online():
print("Not online, can't sync with PostgreSQL")
return
with self._get_pg_connection() as conn:
with conn.cursor() as cur:
self._create_table_if_not_exists(cur)
self._sync_local_to_remote(cur)
self._sync_remote_to_local(cur)
print("Sync with PostgreSQL completed successfully")
[docs]
def resolve_conflict(self, item_id, resolution_choice):
# TODO: make this functionality more robust
session = self.Session()
# pg_item = get_pg_item()
item = session.query(Item).filter_by(id=item_id).first()
if item and item.sync_status == SyncStatus.CONFLICT:
if resolution_choice == "local":
item.sync_status = SyncStatus.MODIFIED
# new_item = item
# new_item.version = max(pg_item.version + 1, item.version)
# update_pg_item(item)
elif resolution_choice == "remote":
# Fetch the latest version
# from PostgreSQL and update local
pass
session.commit()
session.close()
def _get_pg_connection(self):
pg_host: Optional[str] = os.getenv("DB_HOST")
pg_database: Optional[str] = os.getenv("DB_NAME")
pg_user: Optional[str] = os.getenv("DB_USER")
pg_password: Optional[str] = os.getenv("DB_PASSWORD")
return psycopg2.connect(
host=pg_host,
database=pg_database,
user=pg_user,
password=pg_password,
)
def _create_table_if_not_exists(self, cur):
cur.execute(SQL_CREATE_TABLE)
def _sync_local_to_remote(self, cur):
local_items = self.get_items()
for item in local_items:
if item.sync_status == SyncStatus.MODIFIED:
self._handle_modified_item(cur, item)
elif item.sync_status == SyncStatus.DELETED:
self._handle_deleted_item(cur, item)
def _handle_modified_item(self, cur, item):
if self._check_for_conflict(cur, item):
self.set_conflict(item.id)
else:
self._update_or_insert_item(cur, item)
item.sync_status = SyncStatus.SYNCED
def _check_for_conflict(self, cur, item):
cur.execute(SQL_CHECK_FOR_CONFLICTS, (item.id,))
result = cur.fetchone()
return result and result[0] > item.version
def _update_or_insert_item(self, cur, item):
cur.execute(
SQL_UPDATE_OR_INSERT_ITEM,
(
item.id,
item.name,
item.description,
item.created_at,
item.updated_at,
item.version,
"synced",
),
)
def _handle_deleted_item(self, cur, item):
cur.execute(SQL_DELETE_ITEM, (item.id,))
def _sync_remote_to_local(self, cur):
cur.execute(SQL_FETCH_ITEMS)
pg_items = cur.fetchall()
with self.Session() as session:
for pg_item in pg_items:
local_item = (
session.query(Item).filter_by(id=pg_item[0]).first()
)
if not local_item:
self._add_remote_item_to_local(session, pg_item)
session.commit()
def _add_remote_item_to_local(self, session, pg_item):
new_item = Item(
id=pg_item[0],
name=pg_item[1],
description=pg_item[2],
created_at=pg_item[3],
updated_at=pg_item[4],
version=pg_item[5],
sync_status=SyncStatus.SYNCED,
)
session.add(new_item)