437 lines
17 KiB
Python
437 lines
17 KiB
Python
"""APIClient - Agent - Storage layer (SQLite)."""
|
|
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from app.models import HttpRequest, Environment, MockEndpoint
|
|
|
|
DB_PATH = Path.home() / ".ekika-api-client" / "data.db"
|
|
|
|
|
|
def _get_conn() -> sqlite3.Connection:
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
return conn
|
|
|
|
|
|
def _migrate(conn: sqlite3.Connection):
|
|
"""Add columns/tables introduced after initial schema."""
|
|
existing = {row[1] for row in conn.execute("PRAGMA table_info(requests)")}
|
|
migrations = [
|
|
("folder_id", "ALTER TABLE requests ADD COLUMN folder_id INTEGER"),
|
|
("params", "ALTER TABLE requests ADD COLUMN params TEXT"),
|
|
("body_type", "ALTER TABLE requests ADD COLUMN body_type TEXT DEFAULT 'raw'"),
|
|
("auth_type", "ALTER TABLE requests ADD COLUMN auth_type TEXT DEFAULT 'none'"),
|
|
("auth_data", "ALTER TABLE requests ADD COLUMN auth_data TEXT"),
|
|
("pre_request_script", "ALTER TABLE requests ADD COLUMN pre_request_script TEXT"),
|
|
("test_script", "ALTER TABLE requests ADD COLUMN test_script TEXT"),
|
|
("timeout", "ALTER TABLE requests ADD COLUMN timeout INTEGER DEFAULT 30"),
|
|
("ssl_verify", "ALTER TABLE requests ADD COLUMN ssl_verify INTEGER DEFAULT 1"),
|
|
("content_type", "ALTER TABLE requests ADD COLUMN content_type TEXT"),
|
|
("created_at", "ALTER TABLE requests ADD COLUMN created_at DATETIME DEFAULT CURRENT_TIMESTAMP"),
|
|
]
|
|
for col, sql in migrations:
|
|
if col not in existing:
|
|
conn.execute(sql)
|
|
|
|
hist_cols = {row[1] for row in conn.execute("PRAGMA table_info(history)")}
|
|
hist_migrations = [
|
|
("params", "ALTER TABLE history ADD COLUMN params TEXT"),
|
|
("body_type", "ALTER TABLE history ADD COLUMN body_type TEXT"),
|
|
("auth_type", "ALTER TABLE history ADD COLUMN auth_type TEXT"),
|
|
("auth_data", "ALTER TABLE history ADD COLUMN auth_data TEXT"),
|
|
("timeout", "ALTER TABLE history ADD COLUMN timeout INTEGER DEFAULT 30"),
|
|
("ssl_verify","ALTER TABLE history ADD COLUMN ssl_verify INTEGER DEFAULT 1"),
|
|
]
|
|
for col, sql in hist_migrations:
|
|
if col not in hist_cols:
|
|
conn.execute(sql)
|
|
|
|
|
|
def init_db():
|
|
with _get_conn() as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS collections (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS folders (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
collection_id INTEGER NOT NULL,
|
|
name TEXT NOT NULL,
|
|
FOREIGN KEY (collection_id) REFERENCES collections(id)
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS requests (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
collection_id INTEGER,
|
|
folder_id INTEGER,
|
|
name TEXT,
|
|
method TEXT,
|
|
url TEXT,
|
|
headers TEXT,
|
|
params TEXT,
|
|
body TEXT,
|
|
body_type TEXT DEFAULT 'raw',
|
|
content_type TEXT,
|
|
auth_type TEXT DEFAULT 'none',
|
|
auth_data TEXT,
|
|
pre_request_script TEXT,
|
|
test_script TEXT,
|
|
timeout INTEGER DEFAULT 30,
|
|
ssl_verify INTEGER DEFAULT 1,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (collection_id) REFERENCES collections(id),
|
|
FOREIGN KEY (folder_id) REFERENCES folders(id)
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
method TEXT,
|
|
url TEXT,
|
|
headers TEXT,
|
|
params TEXT,
|
|
body TEXT,
|
|
body_type TEXT,
|
|
auth_type TEXT,
|
|
auth_data TEXT,
|
|
timeout INTEGER DEFAULT 30,
|
|
ssl_verify INTEGER DEFAULT 1,
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS environments (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
variables TEXT DEFAULT '{}',
|
|
is_active INTEGER DEFAULT 0
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS mock_endpoints (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT,
|
|
method TEXT DEFAULT 'GET',
|
|
path TEXT NOT NULL,
|
|
status_code INTEGER DEFAULT 200,
|
|
response_headers TEXT DEFAULT '{}',
|
|
response_body TEXT DEFAULT ''
|
|
)
|
|
""")
|
|
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT
|
|
)
|
|
""")
|
|
|
|
# Performance indexes
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_requests_collection ON requests(collection_id)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_requests_folder ON requests(folder_id)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_history_created ON history(created_at DESC)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_requests_url ON requests(url)")
|
|
|
|
_migrate(conn)
|
|
|
|
|
|
# ── Collections ──────────────────────────────────────────────────────────────
|
|
|
|
def get_collections() -> list[dict]:
|
|
with _get_conn() as conn:
|
|
rows = conn.execute("SELECT * FROM collections ORDER BY name COLLATE NOCASE").fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def add_collection(name: str) -> int:
|
|
with _get_conn() as conn:
|
|
cur = conn.execute("INSERT INTO collections (name) VALUES (?)", (name,))
|
|
return cur.lastrowid
|
|
|
|
|
|
def rename_collection(col_id: int, name: str):
|
|
with _get_conn() as conn:
|
|
conn.execute("UPDATE collections SET name=? WHERE id=?", (name, col_id))
|
|
|
|
|
|
def delete_collection(col_id: int):
|
|
with _get_conn() as conn:
|
|
conn.execute("DELETE FROM requests WHERE collection_id=?", (col_id,))
|
|
conn.execute("DELETE FROM folders WHERE collection_id=?", (col_id,))
|
|
conn.execute("DELETE FROM collections WHERE id=?", (col_id,))
|
|
|
|
|
|
# ── Folders ───────────────────────────────────────────────────────────────────
|
|
|
|
def get_folders(collection_id: int) -> list[dict]:
|
|
with _get_conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM folders WHERE collection_id=? ORDER BY name COLLATE NOCASE",
|
|
(collection_id,)
|
|
).fetchall()
|
|
return [dict(r) for r in rows]
|
|
|
|
|
|
def add_folder(collection_id: int, name: str) -> int:
|
|
with _get_conn() as conn:
|
|
cur = conn.execute(
|
|
"INSERT INTO folders (collection_id, name) VALUES (?,?)", (collection_id, name)
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def rename_folder(folder_id: int, name: str):
|
|
with _get_conn() as conn:
|
|
conn.execute("UPDATE folders SET name=? WHERE id=?", (name, folder_id))
|
|
|
|
|
|
def delete_folder(folder_id: int):
|
|
with _get_conn() as conn:
|
|
conn.execute("DELETE FROM requests WHERE folder_id=?", (folder_id,))
|
|
conn.execute("DELETE FROM folders WHERE id=?", (folder_id,))
|
|
|
|
|
|
# ── Requests ──────────────────────────────────────────────────────────────────
|
|
|
|
def _deserialize_request(r) -> dict:
|
|
d = dict(r)
|
|
d["headers"] = json.loads(d.get("headers") or "{}")
|
|
d["params"] = json.loads(d.get("params") or "{}")
|
|
d["auth_data"]= json.loads(d.get("auth_data")or "{}")
|
|
return d
|
|
|
|
|
|
def get_requests(collection_id: int, folder_id: int = None) -> list[dict]:
|
|
with _get_conn() as conn:
|
|
if folder_id is not None:
|
|
rows = conn.execute(
|
|
"SELECT * FROM requests WHERE collection_id=? AND folder_id=? ORDER BY name COLLATE NOCASE",
|
|
(collection_id, folder_id)
|
|
).fetchall()
|
|
else:
|
|
rows = conn.execute(
|
|
"SELECT * FROM requests WHERE collection_id=? AND folder_id IS NULL ORDER BY name COLLATE NOCASE",
|
|
(collection_id,)
|
|
).fetchall()
|
|
return [_deserialize_request(r) for r in rows]
|
|
|
|
|
|
def get_all_requests(collection_id: int) -> list[dict]:
|
|
with _get_conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM requests WHERE collection_id=? ORDER BY name COLLATE NOCASE",
|
|
(collection_id,)
|
|
).fetchall()
|
|
return [_deserialize_request(r) for r in rows]
|
|
|
|
|
|
def save_request(collection_id: int, req: HttpRequest, folder_id: int = None) -> int:
|
|
with _get_conn() as conn:
|
|
cur = conn.execute(
|
|
"""INSERT INTO requests
|
|
(collection_id, folder_id, name, method, url, headers, params,
|
|
body, body_type, content_type, auth_type, auth_data,
|
|
pre_request_script, test_script, timeout, ssl_verify)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
|
|
(collection_id, folder_id, req.name, req.method, req.url,
|
|
json.dumps(req.headers), json.dumps(req.params),
|
|
req.body, req.body_type, req.content_type,
|
|
req.auth_type, json.dumps(req.auth_data),
|
|
req.pre_request_script, req.test_script,
|
|
req.timeout, int(req.ssl_verify)),
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def update_request(req_id: int, req: HttpRequest):
|
|
with _get_conn() as conn:
|
|
conn.execute(
|
|
"""UPDATE requests SET
|
|
name=?, method=?, url=?, headers=?, params=?,
|
|
body=?, body_type=?, content_type=?, auth_type=?, auth_data=?,
|
|
pre_request_script=?, test_script=?, timeout=?, ssl_verify=?
|
|
WHERE id=?""",
|
|
(req.name, req.method, req.url,
|
|
json.dumps(req.headers), json.dumps(req.params),
|
|
req.body, req.body_type, req.content_type,
|
|
req.auth_type, json.dumps(req.auth_data),
|
|
req.pre_request_script, req.test_script,
|
|
req.timeout, int(req.ssl_verify), req_id),
|
|
)
|
|
|
|
|
|
def delete_request(req_id: int):
|
|
with _get_conn() as conn:
|
|
conn.execute("DELETE FROM requests WHERE id=?", (req_id,))
|
|
|
|
|
|
def search_requests(query: str) -> list[dict]:
|
|
with _get_conn() as conn:
|
|
like = f"%{query}%"
|
|
rows = conn.execute(
|
|
"""SELECT r.*, c.name as collection_name
|
|
FROM requests r
|
|
LEFT JOIN collections c ON r.collection_id = c.id
|
|
WHERE r.name LIKE ? OR r.url LIKE ?
|
|
ORDER BY r.name COLLATE NOCASE""",
|
|
(like, like)
|
|
).fetchall()
|
|
return [_deserialize_request(r) for r in rows]
|
|
|
|
|
|
# ── History ───────────────────────────────────────────────────────────────────
|
|
|
|
def add_to_history(req: HttpRequest):
|
|
with _get_conn() as conn:
|
|
# Trim history to 200 entries
|
|
conn.execute(
|
|
"""DELETE FROM history WHERE id NOT IN (
|
|
SELECT id FROM history ORDER BY created_at DESC LIMIT 199
|
|
)"""
|
|
)
|
|
conn.execute(
|
|
"""INSERT INTO history
|
|
(method, url, headers, params, body, body_type, auth_type, auth_data, timeout, ssl_verify)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?)""",
|
|
(req.method, req.url,
|
|
json.dumps(req.headers), json.dumps(req.params),
|
|
req.body, req.body_type, req.auth_type, json.dumps(req.auth_data),
|
|
req.timeout, int(req.ssl_verify)),
|
|
)
|
|
|
|
|
|
def get_history(limit: int = 50) -> list[dict]:
|
|
with _get_conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM history ORDER BY created_at DESC LIMIT ?", (limit,)
|
|
).fetchall()
|
|
return [_deserialize_request(r) for r in rows]
|
|
|
|
|
|
def clear_history():
|
|
with _get_conn() as conn:
|
|
conn.execute("DELETE FROM history")
|
|
|
|
|
|
# ── Environments ──────────────────────────────────────────────────────────────
|
|
|
|
def get_environments() -> list[Environment]:
|
|
with _get_conn() as conn:
|
|
rows = conn.execute("SELECT * FROM environments ORDER BY name COLLATE NOCASE").fetchall()
|
|
return [
|
|
Environment(
|
|
id=r["id"], name=r["name"],
|
|
variables=json.loads(r["variables"] or "{}"),
|
|
is_active=bool(r["is_active"])
|
|
)
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def get_active_environment() -> Environment | None:
|
|
with _get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM environments WHERE is_active=1").fetchone()
|
|
if row:
|
|
return Environment(
|
|
id=row["id"], name=row["name"],
|
|
variables=json.loads(row["variables"] or "{}"),
|
|
is_active=True
|
|
)
|
|
return None
|
|
|
|
|
|
def save_environment(env: Environment) -> int:
|
|
with _get_conn() as conn:
|
|
if env.id:
|
|
conn.execute(
|
|
"UPDATE environments SET name=?, variables=? WHERE id=?",
|
|
(env.name, json.dumps(env.variables), env.id)
|
|
)
|
|
return env.id
|
|
else:
|
|
cur = conn.execute(
|
|
"INSERT INTO environments (name, variables, is_active) VALUES (?,?,?)",
|
|
(env.name, json.dumps(env.variables), int(env.is_active))
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def set_active_environment(env_id: int | None):
|
|
with _get_conn() as conn:
|
|
conn.execute("UPDATE environments SET is_active=0")
|
|
if env_id:
|
|
conn.execute("UPDATE environments SET is_active=1 WHERE id=?", (env_id,))
|
|
|
|
|
|
def delete_environment(env_id: int):
|
|
with _get_conn() as conn:
|
|
conn.execute("DELETE FROM environments WHERE id=?", (env_id,))
|
|
|
|
|
|
# ── Mock Endpoints ────────────────────────────────────────────────────────────
|
|
|
|
def get_mock_endpoints() -> list[MockEndpoint]:
|
|
with _get_conn() as conn:
|
|
rows = conn.execute("SELECT * FROM mock_endpoints ORDER BY path").fetchall()
|
|
return [
|
|
MockEndpoint(
|
|
id=r["id"], name=r["name"], method=r["method"],
|
|
path=r["path"], status_code=r["status_code"],
|
|
response_headers=json.loads(r["response_headers"] or "{}"),
|
|
response_body=r["response_body"] or ""
|
|
)
|
|
for r in rows
|
|
]
|
|
|
|
|
|
def save_mock_endpoint(ep: MockEndpoint) -> int:
|
|
with _get_conn() as conn:
|
|
if ep.id:
|
|
conn.execute(
|
|
"""UPDATE mock_endpoints
|
|
SET name=?, method=?, path=?, status_code=?, response_headers=?, response_body=?
|
|
WHERE id=?""",
|
|
(ep.name, ep.method, ep.path, ep.status_code,
|
|
json.dumps(ep.response_headers), ep.response_body, ep.id)
|
|
)
|
|
return ep.id
|
|
else:
|
|
cur = conn.execute(
|
|
"""INSERT INTO mock_endpoints
|
|
(name, method, path, status_code, response_headers, response_body)
|
|
VALUES (?,?,?,?,?,?)""",
|
|
(ep.name, ep.method, ep.path, ep.status_code,
|
|
json.dumps(ep.response_headers), ep.response_body)
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def delete_mock_endpoint(ep_id: int):
|
|
with _get_conn() as conn:
|
|
conn.execute("DELETE FROM mock_endpoints WHERE id=?", (ep_id,))
|
|
|
|
|
|
# ── Settings ──────────────────────────────────────────────────────────────────
|
|
|
|
def get_setting(key: str, default: str = "") -> str:
|
|
with _get_conn() as conn:
|
|
row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
|
|
return row["value"] if row else default
|
|
|
|
|
|
def set_setting(key: str, value: str):
|
|
with _get_conn() as conn:
|
|
conn.execute(
|
|
"INSERT INTO settings (key, value) VALUES (?,?) "
|
|
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
|
|
(key, value)
|
|
)
|