db.py•7.43 kB
"""SQLite database operations module."""
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional
class SQLiteDatabase:
"""Manages SQLite database connections and operations."""
def __init__(self):
"""Initialize the database manager."""
self.connection: Optional[sqlite3.Connection] = None
def open(self, db_path: str) -> str:
"""
Open or create a SQLite database.
Args:
db_path: Path to the database file
Returns:
Success message
"""
try:
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.connection = sqlite3.connect(db_path)
self.connection.row_factory = sqlite3.Row
return f"Database opened successfully: {db_path}"
except Exception as e:
raise Exception(f"Failed to open database: {str(e)}")
def close(self) -> str:
"""
Close the database connection.
Returns:
Success message
"""
if self.connection is None:
raise Exception("No database is currently open")
self.connection.close()
self.connection = None
return "Database closed successfully"
def _ensure_connected(self) -> None:
"""Ensure database is connected."""
if self.connection is None:
raise Exception("No database is open. Please open a database first.")
def execute_query(
self, query: str, parameters: Optional[List[Any]] = None
) -> Dict[str, Any]:
"""
Execute a SELECT query and return results.
Args:
query: SQL SELECT query
parameters: Query parameters for prepared statements
Returns:
Dictionary with rows and column count
"""
self._ensure_connected()
try:
cursor = self.connection.cursor()
params = parameters or []
cursor.execute(query, params)
rows = cursor.fetchall()
result_rows = [dict(row) for row in rows]
column_count = len(result_rows[0]) if result_rows else 0
return {"rows": result_rows, "column_count": column_count}
except Exception as e:
raise Exception(f"Query execution failed: {str(e)}")
def insert(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Insert a row into a table.
Args:
table: Table name
data: Dictionary of column names and values
Returns:
Dictionary with lastID and changes count
"""
self._ensure_connected()
try:
columns = list(data.keys())
values = list(data.values())
placeholders = ", ".join(["?"] * len(columns))
query = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
cursor = self.connection.cursor()
cursor.execute(query, values)
self.connection.commit()
return {"lastID": cursor.lastrowid, "changes": cursor.rowcount}
except Exception as e:
self.connection.rollback()
raise Exception(f"Insert failed: {str(e)}")
def update(
self,
table: str,
data: Dict[str, Any],
where: str,
where_params: Optional[List[Any]] = None,
) -> Dict[str, Any]:
"""
Update rows in a table.
Args:
table: Table name
data: Dictionary of column names and new values
where: WHERE clause condition
where_params: Parameters for WHERE clause
Returns:
Dictionary with changes count
"""
self._ensure_connected()
try:
columns = list(data.keys())
values = list(data.values())
set_clause = ", ".join([f"{col} = ?" for col in columns])
query = f"UPDATE {table} SET {set_clause} WHERE {where}"
params = where_params or []
cursor = self.connection.cursor()
cursor.execute(query, values + params)
self.connection.commit()
return {"changes": cursor.rowcount}
except Exception as e:
self.connection.rollback()
raise Exception(f"Update failed: {str(e)}")
def delete(
self,
table: str,
where: str,
where_params: Optional[List[Any]] = None,
) -> Dict[str, Any]:
"""
Delete rows from a table.
Args:
table: Table name
where: WHERE clause condition
where_params: Parameters for WHERE clause
Returns:
Dictionary with changes count
"""
self._ensure_connected()
try:
query = f"DELETE FROM {table} WHERE {where}"
params = where_params or []
cursor = self.connection.cursor()
cursor.execute(query, params)
self.connection.commit()
return {"changes": cursor.rowcount}
except Exception as e:
self.connection.rollback()
raise Exception(f"Delete failed: {str(e)}")
def create_table(self, table: str, schema: str) -> str:
"""
Create a new table.
Args:
table: Table name
schema: Column definitions
Returns:
Success message
"""
self._ensure_connected()
try:
query = f"CREATE TABLE {table} ({schema})"
cursor = self.connection.cursor()
cursor.execute(query)
self.connection.commit()
return f"Table '{table}' created successfully"
except Exception as e:
self.connection.rollback()
raise Exception(f"Table creation failed: {str(e)}")
def list_tables(self) -> Dict[str, List[str]]:
"""
List all tables in the database.
Returns:
Dictionary with list of table names
"""
self._ensure_connected()
try:
query = "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'"
cursor = self.connection.cursor()
cursor.execute(query)
tables = [row[0] for row in cursor.fetchall()]
return {"tables": tables}
except Exception as e:
raise Exception(f"Failed to list tables: {str(e)}")
def get_table_schema(self, table: str) -> Dict[str, List[Dict[str, Any]]]:
"""
Get the schema of a table.
Args:
table: Table name
Returns:
Dictionary with column information
"""
self._ensure_connected()
try:
query = f"PRAGMA table_info({table})"
cursor = self.connection.cursor()
cursor.execute(query)
columns = []
for row in cursor.fetchall():
columns.append(
{
"cid": row[0],
"name": row[1],
"type": row[2],
"notnull": row[3],
"dflt_value": row[4],
"pk": row[5],
}
)
return {"columns": columns}
except Exception as e:
raise Exception(f"Failed to get table schema: {str(e)}")