Initial commit

Photo-based book cataloger with AI identification.
Room → Cabinet → Shelf → Book hierarchy; FastAPI + SQLite backend;
vanilla JS SPA; OpenAI-compatible plugin system for boundary
detection, text recognition, and archive search.
This commit is contained in:
night
2026-03-09 14:11:11 +03:00
commit f29678ebf1
64 changed files with 8605 additions and 0 deletions

0
tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,239 @@
/**
* pure-functions.test.js
* Unit tests for pure / side-effect-free functions extracted from static/js/*.
*
* Strategy: use node:vm runInNewContext to execute each browser script in a
* fresh sandbox. Function declarations at the top level of a script become
* properties of the sandbox context object, which is what we assert against.
* Files that reference the DOM at load-time (photo.js) receive a minimal stub.
*/
import { test } from 'node:test';
import assert from 'node:assert/strict';
import { readFileSync } from 'node:fs';
import { runInNewContext } from 'node:vm';
import { fileURLToPath } from 'node:url';
import { join, dirname } from 'node:path';
// Values returned from VM sandboxes live in a different V8 realm, so
// deepStrictEqual rejects them even when structurally identical.
// JSON round-trip moves them into the current realm before comparison.
const j = (v) => JSON.parse(JSON.stringify(v));
const ROOT = join(dirname(fileURLToPath(import.meta.url)), '..', '..');
/**
* Load a browser script into a fresh VM sandbox and return the sandbox.
* A minimal DOM stub is merged with `extra` so top-level DOM calls don't throw.
*/
function load(relPath, extra = {}) {
const code = readFileSync(join(ROOT, relPath), 'utf8');
const el = {
textContent: '', value: '', files: [], style: {},
classList: { add() {}, remove() {} },
setAttribute() {}, removeAttribute() {}, click() {}, addEventListener() {},
};
const ctx = {
document: { getElementById: () => el, querySelector: () => null, querySelectorAll: () => [] },
window: { innerWidth: 800 },
navigator: { userAgent: '' },
clearTimeout() {}, setTimeout() {},
...extra,
};
runInNewContext(code, ctx);
return ctx;
}
// ── esc (helpers.js) ─────────────────────────────────────────────────────────
test('esc: escapes HTML special characters', () => {
const { esc } = load('static/js/helpers.js');
assert.equal(esc('<b>text</b>'), '&lt;b&gt;text&lt;/b&gt;');
assert.equal(esc('"quoted"'), '&quot;quoted&quot;');
assert.equal(esc('a & b'), 'a &amp; b');
assert.equal(esc('<script>alert("xss")</script>'), '&lt;script&gt;alert(&quot;xss&quot;)&lt;/script&gt;');
});
test('esc: coerces null/undefined/number to string', () => {
const { esc } = load('static/js/helpers.js');
assert.equal(esc(null), '');
assert.equal(esc(undefined), '');
assert.equal(esc(42), '42');
});
// ── parseBounds (canvas-boundary.js) ─────────────────────────────────────────
test('parseBounds: parses valid JSON array of fractions', () => {
const { parseBounds } = load('static/js/canvas-boundary.js');
assert.deepEqual(j(parseBounds('[0.25, 0.5, 0.75]')), [0.25, 0.5, 0.75]);
assert.deepEqual(j(parseBounds('[]')), []);
});
test('parseBounds: returns [] for falsy / invalid / null-JSON input', () => {
const { parseBounds } = load('static/js/canvas-boundary.js');
assert.deepEqual(j(parseBounds(null)), []);
assert.deepEqual(j(parseBounds('')), []);
assert.deepEqual(j(parseBounds('not-json')), []);
assert.deepEqual(j(parseBounds('null')), []);
});
// ── parseBndPluginResults (canvas-boundary.js) ────────────────────────────────
test('parseBndPluginResults: parses a valid JSON object', () => {
const { parseBndPluginResults } = load('static/js/canvas-boundary.js');
assert.deepEqual(
j(parseBndPluginResults('{"p1":[0.3,0.6],"p2":[0.4]}')),
{ p1: [0.3, 0.6], p2: [0.4] }
);
});
test('parseBndPluginResults: returns {} for null / array / invalid input', () => {
const { parseBndPluginResults } = load('static/js/canvas-boundary.js');
assert.deepEqual(j(parseBndPluginResults(null)), {});
assert.deepEqual(j(parseBndPluginResults('')), {});
assert.deepEqual(j(parseBndPluginResults('[1,2,3]')), {}); // arrays are rejected
assert.deepEqual(j(parseBndPluginResults('{bad}')), {});
});
// ── parseCandidates (tree-render.js) ──────────────────────────────────────────
/** Load tree-render.js with stubs for all globals it references in function bodies. */
function loadTreeRender() {
return load('static/js/tree-render.js', {
S: { selected: null, expanded: new Set(), _loading: {} },
_plugins: [],
_batchState: { running: false, done: 0, total: 0 },
_bnd: null,
esc: (s) => String(s ?? ''),
isDesktop: () => true,
findNode: () => null,
vDetailBody: () => '',
});
}
test('parseCandidates: parses a valid JSON array', () => {
const { parseCandidates } = loadTreeRender();
const input = [{ title: 'Foo', author: 'Bar', source: 'vlm' }];
assert.deepEqual(j(parseCandidates(JSON.stringify(input))), input);
});
test('parseCandidates: returns [] for null / empty / invalid input', () => {
const { parseCandidates } = loadTreeRender();
assert.deepEqual(j(parseCandidates(null)), []);
assert.deepEqual(j(parseCandidates('')), []);
assert.deepEqual(j(parseCandidates('bad json')), []);
});
// ── getBookStats (tree-render.js) ─────────────────────────────────────────────
function makeBook(status) {
return { id: Math.random(), identification_status: status, title: 'T' };
}
test('getBookStats: counts books by status on a shelf', () => {
const { getBookStats } = loadTreeRender();
const shelf = {
id: 1,
books: [
makeBook('user_approved'),
makeBook('ai_identified'),
makeBook('unidentified'),
makeBook('unidentified'),
],
};
const s = getBookStats(shelf, 'shelf');
assert.equal(s.total, 4);
assert.equal(s.approved, 1);
assert.equal(s.ai, 1);
assert.equal(s.unidentified, 2);
});
test('getBookStats: aggregates across a full room → cabinet → shelf hierarchy', () => {
const { getBookStats } = loadTreeRender();
const room = {
id: 1,
cabinets: [{
id: 2,
shelves: [{
id: 3,
books: [makeBook('user_approved'), makeBook('unidentified'), makeBook('ai_identified')],
}],
}],
};
const s = getBookStats(room, 'room');
assert.equal(s.total, 3);
assert.equal(s.approved, 1);
assert.equal(s.ai, 1);
assert.equal(s.unidentified, 1);
});
test('getBookStats: returns zeros for a book node itself', () => {
const { getBookStats } = loadTreeRender();
const book = makeBook('user_approved');
const s = getBookStats(book, 'book');
assert.equal(s.total, 1);
assert.equal(s.approved, 1);
});
// ── collectQueueBooks (photo.js) ──────────────────────────────────────────────
function loadPhoto() {
return load('static/js/photo.js', {
S: { _photoTarget: null },
_photoQueue: null,
req: async () => ({}),
toast: () => {},
walkTree: () => {},
findNode: () => null,
isDesktop: () => true,
render: () => {},
});
}
test('collectQueueBooks: excludes user_approved books from a shelf', () => {
const { collectQueueBooks } = loadPhoto();
const shelf = {
id: 1,
books: [
{ id: 2, identification_status: 'user_approved', title: 'A' },
{ id: 3, identification_status: 'unidentified', title: 'B' },
{ id: 4, identification_status: 'ai_identified', title: 'C' },
],
};
const result = collectQueueBooks(shelf, 'shelf');
assert.equal(result.length, 2);
assert.deepEqual(j(result.map((b) => b.id)), [3, 4]);
});
test('collectQueueBooks: collects across room → cabinet → shelf hierarchy', () => {
const { collectQueueBooks } = loadPhoto();
const room = {
id: 1,
cabinets: [{
id: 2,
shelves: [{
id: 3,
books: [
{ id: 4, identification_status: 'user_approved' },
{ id: 5, identification_status: 'unidentified' },
{ id: 6, identification_status: 'ai_identified' },
],
}],
}],
};
const result = collectQueueBooks(room, 'room');
assert.equal(result.length, 2);
assert.deepEqual(j(result.map((b) => b.id)), [5, 6]);
});
test('collectQueueBooks: returns empty array when all books are approved', () => {
const { collectQueueBooks } = loadPhoto();
const shelf = {
id: 1,
books: [
{ id: 2, identification_status: 'user_approved' },
{ id: 3, identification_status: 'user_approved' },
],
};
assert.deepEqual(j(collectQueueBooks(shelf, 'shelf')), []);
});

190
tests/test_errors.py Normal file
View File

@@ -0,0 +1,190 @@
"""Tests for config and image error conditions, and exception attribute contracts."""
from pathlib import Path
import pytest
from errors import (
ConfigFileError,
ConfigNotLoadedError,
ConfigValidationError,
ImageFileNotFoundError,
ImageReadError,
)
from logic.images import crop_save, prep_img_b64, serve_crop
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_png(tmp_path: Path, filename: str = "img.png") -> Path:
"""Write a minimal 4x4 red PNG to tmp_path and return its path."""
from PIL import Image
path = tmp_path / filename
img = Image.new("RGB", (4, 4), color=(255, 0, 0))
img.save(path, format="PNG")
return path
def _make_corrupt(tmp_path: Path, filename: str = "bad.jpg") -> Path:
"""Write a file with invalid image bytes and return its path."""
path = tmp_path / filename
path.write_bytes(b"this is not an image\xff\xfe")
return path
# ── ImageFileNotFoundError ────────────────────────────────────────────────────
def test_prep_img_b64_file_not_found(tmp_path: Path) -> None:
missing = tmp_path / "missing.png"
with pytest.raises(ImageFileNotFoundError) as exc_info:
prep_img_b64(missing)
assert exc_info.value.path == missing
assert str(missing) in str(exc_info.value)
def test_crop_save_file_not_found(tmp_path: Path) -> None:
missing = tmp_path / "missing.png"
with pytest.raises(ImageFileNotFoundError) as exc_info:
crop_save(missing, 0, 0, 2, 2)
assert exc_info.value.path == missing
def test_serve_crop_file_not_found(tmp_path: Path) -> None:
missing = tmp_path / "missing.png"
with pytest.raises(ImageFileNotFoundError) as exc_info:
serve_crop(missing, None)
assert exc_info.value.path == missing
# ── ImageReadError ────────────────────────────────────────────────────────────
def test_prep_img_b64_corrupt_file(tmp_path: Path) -> None:
bad = _make_corrupt(tmp_path)
with pytest.raises(ImageReadError) as exc_info:
prep_img_b64(bad)
assert exc_info.value.path == bad
assert str(bad) in str(exc_info.value)
assert exc_info.value.reason # non-empty reason
def test_crop_save_corrupt_file(tmp_path: Path) -> None:
bad = _make_corrupt(tmp_path)
with pytest.raises(ImageReadError) as exc_info:
crop_save(bad, 0, 0, 2, 2)
assert exc_info.value.path == bad
def test_serve_crop_corrupt_file(tmp_path: Path) -> None:
bad = _make_corrupt(tmp_path)
with pytest.raises(ImageReadError) as exc_info:
serve_crop(bad, None)
assert exc_info.value.path == bad
# ── prep_img_b64 success path ─────────────────────────────────────────────────
def test_prep_img_b64_success(tmp_path: Path) -> None:
path = _make_png(tmp_path)
b64, mime = prep_img_b64(path)
assert mime == "image/png"
assert len(b64) > 0
def test_prep_img_b64_with_crop(tmp_path: Path) -> None:
path = _make_png(tmp_path)
b64, mime = prep_img_b64(path, crop_frac=(0.0, 0.0, 0.5, 0.5))
assert mime == "image/png"
assert len(b64) > 0
# ── Config exception attribute contracts ──────────────────────────────────────
def test_config_not_loaded_error() -> None:
exc = ConfigNotLoadedError()
assert "load_config" in str(exc)
def test_config_file_error() -> None:
path = Path("config/missing.yaml")
exc = ConfigFileError(path, "file not found")
assert exc.path == path
assert exc.reason == "file not found"
assert "missing.yaml" in str(exc)
assert "file not found" in str(exc)
def test_config_validation_error() -> None:
exc = ConfigValidationError("unexpected field 'foo'")
assert exc.reason == "unexpected field 'foo'"
assert "unexpected field" in str(exc)
# ── Config loading errors ─────────────────────────────────────────────────────
def test_load_config_raises_on_invalid_yaml(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
import config as config_module
cfg_dir = tmp_path / "config"
cfg_dir.mkdir()
(cfg_dir / "credentials.default.yaml").write_text(": invalid: yaml: {\n")
# write empty valid files for other categories
for cat in ["models", "functions", "ui"]:
(cfg_dir / f"{cat}.default.yaml").write_text(f"{cat}: {{}}\n")
monkeypatch.setattr(config_module, "_CONFIG_DIR", cfg_dir)
with pytest.raises(ConfigFileError) as exc_info:
config_module.load_config()
assert exc_info.value.path == cfg_dir / "credentials.default.yaml"
assert exc_info.value.reason
def test_load_config_raises_on_schema_mismatch(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
import config as config_module
cfg_dir = tmp_path / "config"
cfg_dir.mkdir()
# credentials expects CredentialConfig but we give it a non-dict value
(cfg_dir / "credentials.default.yaml").write_text("credentials:\n openrouter: not_a_dict\n")
for cat in ["models", "functions", "ui"]:
(cfg_dir / f"{cat}.default.yaml").write_text("")
monkeypatch.setattr(config_module, "_CONFIG_DIR", cfg_dir)
with pytest.raises(ConfigValidationError) as exc_info:
config_module.load_config()
assert exc_info.value.reason
def test_get_config_raises_if_not_loaded(monkeypatch: pytest.MonkeyPatch) -> None:
import config as config_module
# Clear the holder to simulate unloaded state
original = list(config_module.config_holder)
config_module.config_holder.clear()
try:
with pytest.raises(ConfigNotLoadedError):
config_module.get_config()
finally:
config_module.config_holder.extend(original)
# ── Image exception string representation ─────────────────────────────────────
def test_image_file_not_found_str() -> None:
exc = ImageFileNotFoundError(Path("/data/images/img.jpg"))
assert exc.path == Path("/data/images/img.jpg")
assert "img.jpg" in str(exc)
def test_image_read_error_str() -> None:
exc = ImageReadError(Path("/data/images/img.jpg"), "cannot identify image file")
assert exc.path == Path("/data/images/img.jpg")
assert exc.reason == "cannot identify image file"
assert "img.jpg" in str(exc)
assert "cannot identify image file" in str(exc)

585
tests/test_logic.py Normal file
View File

@@ -0,0 +1,585 @@
"""Unit tests for logic modules: boundary helpers, identification helpers, build_query, and all error conditions."""
import asyncio
from pathlib import Path
import pytest
import db as db_module
import logic
from errors import (
BookNotFoundError,
CabinetNotFoundError,
InvalidPluginEntityError,
NoCabinetPhotoError,
NoRawTextError,
NoShelfImageError,
PluginNotFoundError,
PluginTargetMismatchError,
ShelfNotFoundError,
)
from logic.archive import run_archive_searcher
from logic.boundaries import book_spine_source, bounds_for_index, run_boundary_detector, shelf_source
from logic.identification import apply_ai_result, build_query, compute_status, dismiss_field, run_book_identifier
from models import (
AIIdentifyResult,
BoundaryDetectResult,
BookRow,
CandidateRecord,
PluginLookupResult,
TextRecognizeResult,
)
# ── BookRow factory ───────────────────────────────────────────────────────────
def _book(**kwargs: object) -> BookRow:
defaults: dict[str, object] = {
"id": "b1",
"shelf_id": "s1",
"position": 0,
"image_filename": None,
"title": "",
"author": "",
"year": "",
"isbn": "",
"publisher": "",
"notes": "",
"raw_text": "",
"ai_title": "",
"ai_author": "",
"ai_year": "",
"ai_isbn": "",
"ai_publisher": "",
"identification_status": "unidentified",
"title_confidence": 0.0,
"analyzed_at": None,
"created_at": "2024-01-01T00:00:00",
"candidates": None,
}
defaults.update(kwargs)
return BookRow(**defaults) # type: ignore[arg-type]
# ── DB fixture for integration tests ─────────────────────────────────────────
@pytest.fixture
def seeded_db(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
"""Temporary DB with a single book row (full parent chain)."""
monkeypatch.setattr(db_module, "DB_PATH", tmp_path / "test.db")
db_module.init_db()
ts = "2024-01-01T00:00:00"
c = db_module.conn()
c.execute("INSERT INTO rooms VALUES (?,?,?,?)", ["r1", "Room", 1, ts])
c.execute("INSERT INTO cabinets VALUES (?,?,?,?,?,?,?,?)", ["c1", "r1", "Cabinet", None, None, None, 1, ts])
c.execute("INSERT INTO shelves VALUES (?,?,?,?,?,?,?,?)", ["s1", "c1", "Shelf", None, None, None, 1, ts])
c.execute(
"INSERT INTO books VALUES (?,?,0,NULL,'','','','','','','','','','','','','unidentified',0,NULL,?,NULL)",
["b1", "s1", ts],
)
c.commit()
c.close()
# ── Stub plugins ──────────────────────────────────────────────────────────────
class _BoundaryDetectorStub:
"""Stub boundary detector that returns empty boundaries."""
plugin_id = "bd_stub"
name = "Stub BD"
auto_queue = False
target = "books"
@property
def max_image_px(self) -> int:
return 1600
def detect(self, image_b64: str, image_mime: str) -> BoundaryDetectResult:
return {"boundaries": [0.5]}
class _BoundaryDetectorShelvesStub:
"""Stub boundary detector targeting shelves (for cabinet entity_type)."""
plugin_id = "bd_shelves_stub"
name = "Stub BD Shelves"
auto_queue = False
target = "shelves"
@property
def max_image_px(self) -> int:
return 1600
def detect(self, image_b64: str, image_mime: str) -> BoundaryDetectResult:
return {"boundaries": []}
class _TextRecognizerStub:
"""Stub text recognizer that returns fixed text."""
plugin_id = "tr_stub"
name = "Stub TR"
auto_queue = False
@property
def max_image_px(self) -> int:
return 1600
def recognize(self, image_b64: str, image_mime: str) -> TextRecognizeResult:
return {"raw_text": "Stub Title", "title": "Stub Title", "author": "Stub Author"}
class _BookIdentifierStub:
"""Stub book identifier that returns a high-confidence result."""
plugin_id = "bi_stub"
name = "Stub BI"
auto_queue = False
@property
def confidence_threshold(self) -> float:
return 0.8
def identify(self, raw_text: str) -> AIIdentifyResult:
return {
"title": "Found Book",
"author": "Found Author",
"year": "2000",
"isbn": "",
"publisher": "",
"confidence": 0.9,
}
class _ArchiveSearcherStub:
"""Stub archive searcher that returns an empty result list."""
plugin_id = "as_stub"
name = "Stub AS"
auto_queue = False
def search(self, query: str) -> list[CandidateRecord]:
return []
# ── bounds_for_index ──────────────────────────────────────────────────────────
def test_bounds_empty_boundaries() -> None:
assert bounds_for_index(None, 0) == (0.0, 1.0)
def test_bounds_empty_json() -> None:
assert bounds_for_index("[]", 0) == (0.0, 1.0)
def test_bounds_single_boundary_first() -> None:
assert bounds_for_index("[0.5]", 0) == (0.0, 0.5)
def test_bounds_single_boundary_second() -> None:
assert bounds_for_index("[0.5]", 1) == (0.5, 1.0)
def test_bounds_multiple_boundaries() -> None:
b = "[0.25, 0.5, 0.75]"
assert bounds_for_index(b, 0) == (0.0, 0.25)
assert bounds_for_index(b, 1) == (0.25, 0.5)
assert bounds_for_index(b, 2) == (0.5, 0.75)
assert bounds_for_index(b, 3) == (0.75, 1.0)
def test_bounds_out_of_range_returns_last_segment() -> None:
_, end = bounds_for_index("[0.5]", 99)
assert end == 1.0
# ── compute_status ────────────────────────────────────────────────────────────
def test_compute_status_unidentified_no_ai_title() -> None:
assert compute_status(_book(ai_title="", title="", author="", year="")) == "unidentified"
def test_compute_status_unidentified_empty() -> None:
assert compute_status(_book()) == "unidentified"
def test_compute_status_ai_identified() -> None:
book = _book(ai_title="Some Book", ai_author="Author", ai_year="2000", ai_isbn="", ai_publisher="")
assert compute_status(book) == "ai_identified"
def test_compute_status_user_approved() -> None:
book = _book(
ai_title="Some Book",
ai_author="Author",
ai_year="2000",
ai_isbn="",
ai_publisher="",
title="Some Book",
author="Author",
year="2000",
isbn="",
publisher="",
)
assert compute_status(book) == "user_approved"
def test_compute_status_ai_identified_when_fields_differ() -> None:
book = _book(
ai_title="Some Book",
ai_author="Original Author",
ai_year="2000",
title="Some Book",
author="Different Author",
year="2000",
)
assert compute_status(book) == "ai_identified"
# ── build_query ───────────────────────────────────────────────────────────────
def test_build_query_from_candidates() -> None:
book = _book(candidates='[{"source": "x", "author": "Tolkien", "title": "LOTR"}]')
assert build_query(book) == "Tolkien LOTR"
def test_build_query_from_ai_fields() -> None:
book = _book(candidates="[]", ai_author="Pushkin", ai_title="Evgeny Onegin", raw_text="")
assert build_query(book) == "Pushkin Evgeny Onegin"
def test_build_query_from_raw_text() -> None:
book = _book(candidates="[]", ai_author="", ai_title="", raw_text="some spine text")
assert build_query(book) == "some spine text"
def test_build_query_empty() -> None:
book = _book(candidates="[]", ai_author="", ai_title="", raw_text="")
assert build_query(book) == ""
def test_build_query_candidates_prefer_first_nonempty() -> None:
book = _book(
candidates='[{"source":"a","author":"","title":""}, {"source":"b","author":"Auth","title":"Title"}]',
ai_author="other",
ai_title="other",
)
assert build_query(book) == "Auth Title"
# ── apply_ai_result ───────────────────────────────────────────────────────────
def test_apply_ai_result_high_confidence(seeded_db: None) -> None:
result: AIIdentifyResult = {
"title": "My Book",
"author": "J. Doe",
"year": "1999",
"isbn": "123",
"publisher": "Pub",
"confidence": 0.9,
}
apply_ai_result("b1", result, confidence_threshold=0.8)
with db_module.connection() as c:
book = db_module.get_book(c, "b1")
assert book is not None
assert book.ai_title == "My Book"
assert book.ai_author == "J. Doe"
assert abs(book.title_confidence - 0.9) < 1e-9
assert book.identification_status == "ai_identified"
def test_apply_ai_result_low_confidence_skips_fields(seeded_db: None) -> None:
result: AIIdentifyResult = {
"title": "My Book",
"author": "J. Doe",
"year": "1999",
"isbn": "",
"publisher": "",
"confidence": 0.5,
}
apply_ai_result("b1", result, confidence_threshold=0.8)
with db_module.connection() as c:
book = db_module.get_book(c, "b1")
assert book is not None
assert book.ai_title == "" # not updated
assert abs(book.title_confidence - 0.5) < 1e-9 # confidence stored regardless
assert book.identification_status == "unidentified"
def test_apply_ai_result_exact_threshold(seeded_db: None) -> None:
result: AIIdentifyResult = {
"title": "Book",
"author": "",
"year": "",
"isbn": "",
"publisher": "",
"confidence": 0.8,
}
apply_ai_result("b1", result, confidence_threshold=0.8)
with db_module.connection() as c:
book = db_module.get_book(c, "b1")
assert book is not None
assert book.ai_title == "Book"
# ── shelf_source error conditions ─────────────────────────────────────────────
def test_shelf_source_not_found(seeded_db: None) -> None:
with db_module.connection() as c:
with pytest.raises(ShelfNotFoundError) as exc_info:
shelf_source(c, "nonexistent")
assert exc_info.value.shelf_id == "nonexistent"
assert "nonexistent" in str(exc_info.value)
def test_shelf_source_no_image(seeded_db: None) -> None:
# s1 has no photo_filename and c1 has no photo_filename → NoShelfImageError
with db_module.connection() as c:
with pytest.raises(NoShelfImageError) as exc_info:
shelf_source(c, "s1")
assert exc_info.value.shelf_id == "s1"
assert exc_info.value.cabinet_id == "c1"
assert "s1" in str(exc_info.value)
assert "c1" in str(exc_info.value)
# ── book_spine_source error conditions ────────────────────────────────────────
def test_book_spine_source_book_not_found(seeded_db: None) -> None:
with db_module.connection() as c:
with pytest.raises(BookNotFoundError) as exc_info:
book_spine_source(c, "nonexistent")
assert exc_info.value.book_id == "nonexistent"
assert "nonexistent" in str(exc_info.value)
def test_book_spine_source_propagates_no_shelf_image(seeded_db: None) -> None:
# b1 exists but s1 has no image → NoShelfImageError propagates through book_spine_source
with db_module.connection() as c:
with pytest.raises(NoShelfImageError) as exc_info:
book_spine_source(c, "b1")
assert exc_info.value.shelf_id == "s1"
assert exc_info.value.cabinet_id == "c1"
# ── run_boundary_detector error conditions ────────────────────────────────────
def test_run_boundary_detector_cabinet_not_found(seeded_db: None) -> None:
plugin = _BoundaryDetectorShelvesStub()
with pytest.raises(CabinetNotFoundError) as exc_info:
run_boundary_detector(plugin, "cabinets", "nonexistent")
assert exc_info.value.cabinet_id == "nonexistent"
assert "nonexistent" in str(exc_info.value)
def test_run_boundary_detector_no_cabinet_photo(seeded_db: None) -> None:
# c1 exists but has no photo_filename
plugin = _BoundaryDetectorShelvesStub()
with pytest.raises(NoCabinetPhotoError) as exc_info:
run_boundary_detector(plugin, "cabinets", "c1")
assert exc_info.value.cabinet_id == "c1"
assert "c1" in str(exc_info.value)
def test_run_boundary_detector_shelf_not_found(seeded_db: None) -> None:
plugin = _BoundaryDetectorStub()
with pytest.raises(ShelfNotFoundError) as exc_info:
run_boundary_detector(plugin, "shelves", "nonexistent")
assert exc_info.value.shelf_id == "nonexistent"
assert "nonexistent" in str(exc_info.value)
def test_run_boundary_detector_shelf_no_image(seeded_db: None) -> None:
# s1 exists but has no image (neither override nor cabinet photo)
plugin = _BoundaryDetectorStub()
with pytest.raises(NoShelfImageError) as exc_info:
run_boundary_detector(plugin, "shelves", "s1")
assert exc_info.value.shelf_id == "s1"
assert exc_info.value.cabinet_id == "c1"
# ── run_book_identifier error conditions ──────────────────────────────────────
def test_run_book_identifier_not_found(seeded_db: None) -> None:
plugin = _BookIdentifierStub()
with pytest.raises(BookNotFoundError) as exc_info:
run_book_identifier(plugin, "nonexistent")
assert exc_info.value.book_id == "nonexistent"
assert "nonexistent" in str(exc_info.value)
def test_run_book_identifier_no_raw_text(seeded_db: None) -> None:
# b1 has raw_text='' (default)
plugin = _BookIdentifierStub()
with pytest.raises(NoRawTextError) as exc_info:
run_book_identifier(plugin, "b1")
assert exc_info.value.book_id == "b1"
assert "b1" in str(exc_info.value)
# ── run_archive_searcher error conditions ─────────────────────────────────────
def test_run_archive_searcher_not_found(seeded_db: None) -> None:
plugin = _ArchiveSearcherStub()
with pytest.raises(BookNotFoundError) as exc_info:
run_archive_searcher(plugin, "nonexistent")
assert exc_info.value.book_id == "nonexistent"
assert "nonexistent" in str(exc_info.value)
# ── dismiss_field error conditions ────────────────────────────────────────────
def test_dismiss_field_not_found(seeded_db: None) -> None:
with pytest.raises(BookNotFoundError) as exc_info:
dismiss_field("nonexistent", "title", "some value")
assert exc_info.value.book_id == "nonexistent"
assert "nonexistent" in str(exc_info.value)
# ── dispatch_plugin error conditions ──────────────────────────────────────────
def _run_dispatch(plugin_id: str, lookup: PluginLookupResult, entity_type: str, entity_id: str) -> None:
"""Helper to synchronously drive the async dispatch_plugin."""
async def _inner() -> None:
loop = asyncio.get_event_loop()
await logic.dispatch_plugin(plugin_id, lookup, entity_type, entity_id, loop)
asyncio.run(_inner())
def test_dispatch_plugin_not_found() -> None:
with pytest.raises(PluginNotFoundError) as exc_info:
_run_dispatch("no_such_plugin", (None, None), "books", "b1")
assert exc_info.value.plugin_id == "no_such_plugin"
assert "no_such_plugin" in str(exc_info.value)
def test_dispatch_plugin_boundary_wrong_entity_type() -> None:
lookup = ("boundary_detector", _BoundaryDetectorStub())
with pytest.raises(InvalidPluginEntityError) as exc_info:
_run_dispatch("bd_stub", lookup, "books", "b1")
assert exc_info.value.plugin_category == "boundary_detector"
assert exc_info.value.entity_type == "books"
assert "boundary_detector" in str(exc_info.value)
assert "books" in str(exc_info.value)
def test_dispatch_plugin_target_mismatch_cabinets(seeded_db: None) -> None:
# Plugin targets "books" but entity_type is "cabinets" (expects target="shelves")
plugin = _BoundaryDetectorStub() # target = "books"
lookup = ("boundary_detector", plugin)
with pytest.raises(PluginTargetMismatchError) as exc_info:
_run_dispatch("bd_stub", lookup, "cabinets", "c1")
assert exc_info.value.plugin_id == "bd_stub"
assert exc_info.value.expected_target == "shelves"
assert exc_info.value.actual_target == "books"
assert "bd_stub" in str(exc_info.value)
def test_dispatch_plugin_target_mismatch_shelves(seeded_db: None) -> None:
# Plugin targets "shelves" but entity_type is "shelves" (expects target="books")
plugin = _BoundaryDetectorShelvesStub() # target = "shelves"
lookup = ("boundary_detector", plugin)
with pytest.raises(PluginTargetMismatchError) as exc_info:
_run_dispatch("bd_shelves_stub", lookup, "shelves", "s1")
assert exc_info.value.plugin_id == "bd_shelves_stub"
assert exc_info.value.expected_target == "books"
assert exc_info.value.actual_target == "shelves"
def test_dispatch_plugin_text_recognizer_wrong_entity_type() -> None:
lookup = ("text_recognizer", _TextRecognizerStub())
with pytest.raises(InvalidPluginEntityError) as exc_info:
_run_dispatch("tr_stub", lookup, "cabinets", "c1")
assert exc_info.value.plugin_category == "text_recognizer"
assert exc_info.value.entity_type == "cabinets"
def test_dispatch_plugin_book_identifier_wrong_entity_type() -> None:
lookup = ("book_identifier", _BookIdentifierStub())
with pytest.raises(InvalidPluginEntityError) as exc_info:
_run_dispatch("bi_stub", lookup, "shelves", "s1")
assert exc_info.value.plugin_category == "book_identifier"
assert exc_info.value.entity_type == "shelves"
def test_dispatch_plugin_archive_searcher_wrong_entity_type() -> None:
lookup = ("archive_searcher", _ArchiveSearcherStub())
with pytest.raises(InvalidPluginEntityError) as exc_info:
_run_dispatch("as_stub", lookup, "cabinets", "c1")
assert exc_info.value.plugin_category == "archive_searcher"
assert exc_info.value.entity_type == "cabinets"
# ── Exception string representation ───────────────────────────────────────────
def test_exception_str_cabinet_not_found() -> None:
exc = CabinetNotFoundError("cab-123")
assert exc.cabinet_id == "cab-123"
assert "cab-123" in str(exc)
def test_exception_str_shelf_not_found() -> None:
exc = ShelfNotFoundError("shelf-456")
assert exc.shelf_id == "shelf-456"
assert "shelf-456" in str(exc)
def test_exception_str_plugin_not_found() -> None:
exc = PluginNotFoundError("myplugin")
assert exc.plugin_id == "myplugin"
assert "myplugin" in str(exc)
def test_exception_str_no_shelf_image() -> None:
exc = NoShelfImageError("s1", "c1")
assert exc.shelf_id == "s1"
assert exc.cabinet_id == "c1"
assert "s1" in str(exc)
assert "c1" in str(exc)
def test_exception_str_no_cabinet_photo() -> None:
exc = NoCabinetPhotoError("c1")
assert exc.cabinet_id == "c1"
assert "c1" in str(exc)
def test_exception_str_no_raw_text() -> None:
exc = NoRawTextError("b1")
assert exc.book_id == "b1"
assert "b1" in str(exc)
def test_exception_str_invalid_plugin_entity() -> None:
exc = InvalidPluginEntityError("text_recognizer", "cabinets")
assert exc.plugin_category == "text_recognizer"
assert exc.entity_type == "cabinets"
assert "text_recognizer" in str(exc)
assert "cabinets" in str(exc)
def test_exception_str_plugin_target_mismatch() -> None:
exc = PluginTargetMismatchError("my_bd", "shelves", "books")
assert exc.plugin_id == "my_bd"
assert exc.expected_target == "shelves"
assert exc.actual_target == "books"
assert "my_bd" in str(exc)
assert "shelves" in str(exc)
assert "books" in str(exc)

149
tests/test_storage.py Normal file
View File

@@ -0,0 +1,149 @@
"""Unit tests for db.py, files.py, and config.py: DB helpers, name/position counters, settings merge."""
import sqlite3
from collections.abc import Iterator
from pathlib import Path
import pytest
import db
import files
from config import deep_merge
@pytest.fixture(autouse=True)
def reset_counters() -> Iterator[None]:
db.COUNTERS.clear()
yield
db.COUNTERS.clear()
@pytest.fixture
def test_db(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Iterator[sqlite3.Connection]:
"""Temporary SQLite database with full schema applied."""
monkeypatch.setattr(db, "DB_PATH", tmp_path / "test.db")
monkeypatch.setattr(files, "DATA_DIR", tmp_path)
monkeypatch.setattr(files, "IMAGES_DIR", tmp_path / "images")
files.init_dirs()
db.init_db()
connection = db.conn()
yield connection
connection.close()
# ── deep_merge ────────────────────────────────────────────────────────────────
def test_deep_merge_basic() -> None:
result = deep_merge({"a": 1, "b": 2}, {"b": 3, "c": 4})
assert result == {"a": 1, "b": 3, "c": 4}
def test_deep_merge_nested() -> None:
base = {"x": {"a": 1, "b": 2}}
override = {"x": {"b": 99, "c": 3}}
result = deep_merge(base, override)
assert result == {"x": {"a": 1, "b": 99, "c": 3}}
def test_deep_merge_list_replacement() -> None:
base = {"items": [1, 2, 3]}
override = {"items": [4, 5]}
result = deep_merge(base, override)
assert result["items"] == [4, 5]
def test_deep_merge_does_not_mutate_base() -> None:
base = {"a": {"x": 1}}
deep_merge(base, {"a": {"x": 2}})
assert base["a"]["x"] == 1
# ── uid / now ────────────────────────────────────────────────────────────────
def test_uid_unique() -> None:
assert db.uid() != db.uid()
def test_uid_is_string() -> None:
result = db.uid()
assert isinstance(result, str)
assert len(result) == 36 # UUID4 format
def test_now_is_string() -> None:
result = db.now()
assert isinstance(result, str)
assert "T" in result # ISO format
# ── next_name ────────────────────────────────────────────────────────────────
def test_next_name_increments() -> None:
assert db.next_name("Room") == "Room 1"
assert db.next_name("Room") == "Room 2"
assert db.next_name("Room") == "Room 3"
def test_next_name_independent_prefixes() -> None:
assert db.next_name("Room") == "Room 1"
assert db.next_name("Shelf") == "Shelf 1"
assert db.next_name("Room") == "Room 2"
# ── next_pos / next_root_pos ────────────────────────────────────────────────
def test_next_root_pos_empty(test_db: sqlite3.Connection) -> None:
pos = db.next_root_pos(test_db, "rooms")
assert pos == 1
def test_next_root_pos_with_rows(test_db: sqlite3.Connection) -> None:
ts = db.now()
test_db.execute("INSERT INTO rooms VALUES (?,?,?,?)", ["r1", "Room 1", 1, ts])
test_db.execute("INSERT INTO rooms VALUES (?,?,?,?)", ["r2", "Room 2", 2, ts])
test_db.commit()
assert db.next_root_pos(test_db, "rooms") == 3
def test_next_pos_empty(test_db: sqlite3.Connection) -> None:
ts = db.now()
test_db.execute("INSERT INTO rooms VALUES (?,?,?,?)", ["r1", "Room", 1, ts])
test_db.commit()
pos = db.next_pos(test_db, "cabinets", "room_id", "r1")
assert pos == 1
def test_next_pos_with_children(test_db: sqlite3.Connection) -> None:
ts = db.now()
test_db.execute("INSERT INTO rooms VALUES (?,?,?,?)", ["r1", "Room", 1, ts])
test_db.execute("INSERT INTO cabinets VALUES (?,?,?,?,?,?,?,?)", ["c1", "r1", "C1", None, None, None, 1, ts])
test_db.execute("INSERT INTO cabinets VALUES (?,?,?,?,?,?,?,?)", ["c2", "r1", "C2", None, None, None, 2, ts])
test_db.commit()
pos = db.next_pos(test_db, "cabinets", "room_id", "r1")
assert pos == 3
# ── init_db ────────────────────────────────────────────────────────────────────
def test_init_db_creates_tables(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(db, "DB_PATH", tmp_path / "test.db")
db.init_db()
connection = sqlite3.connect(tmp_path / "test.db")
tables = {row[0] for row in connection.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
connection.close()
assert {"rooms", "cabinets", "shelves", "books"}.issubset(tables)
# ── init_dirs ─────────────────────────────────────────────────────────────────
def test_init_dirs_creates_images_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(files, "DATA_DIR", tmp_path)
monkeypatch.setattr(files, "IMAGES_DIR", tmp_path / "images")
files.init_dirs()
assert (tmp_path / "images").is_dir()