Skip to content

Python API Reference

This reference is auto-generated from source code docstrings.

Core Engine

HsrsEngine

mistaber.engine.engine.HsrsEngine

Main engine for halachic symbolic reasoning.

Provides query interface for multi-world reasoning using Kripke semantics.

Source code in mistaber/engine/engine.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
class HsrsEngine:
    """
    Main engine for halachic symbolic reasoning.

    Provides query interface for multi-world reasoning using
    Kripke semantics.
    """

    def __init__(self, ontology_path: Path, default_world: str = "base"):
        """
        Initialize the engine.

        Args:
            ontology_path: Path to ontology directory
            default_world: Default world for queries
        """
        if clingo is None:
            raise ImportError(
                "clingo is required to use HsrsEngine. Install with: pip install clingo"
            )

        self.ontology_path = Path(ontology_path)
        self.default_world = default_world
        self._world_manager = WorldManager()
        self._ctl: Optional[clingo.Control] = None
        self._is_loaded = False
        self._completeness_checker: Optional[CompletenessChecker] = None
        self._config = EngineConfig(world=default_world)

        self._load_ontology()

    @property
    def is_loaded(self) -> bool:
        """Return True if ontology is loaded."""
        return self._is_loaded

    def _load_ontology(self) -> None:
        """Load all ontology files."""
        self._ctl = self._build_control(config=self._config)
        self._is_loaded = True

    def _build_control(self, *, config: EngineConfig, extra_facts: str = "") -> "clingo.Control":
        """Build a fresh clingo control with ontology + engine modules + runtime config.

        `extra_facts` are injected as an additional program part named "scenario".
        """
        assert clingo is not None
        ctl = clingo.Control(["--warn=none"])

        # Load ontology files (schema/base/worlds)
        for lp_dir in ("schema", "base", "worlds"):
            dir_path = self.ontology_path / lp_dir
            if dir_path.exists():
                for lp_file in sorted(dir_path.glob("*.lp")):
                    ctl.load(str(lp_file))

        # Load engine rules (must be clingo-compatible)
        engine_dir = self.ontology_path.parent / "engine"
        for name in ("safek.lp", "priorities.lp", "interpretations.lp", "policy.lp"):
            path = engine_dir / name
            if path.exists():
                ctl.load(str(path))

        # Load individual commentator interpretations
        interpretations_dir = self.ontology_path / "interpretations"
        if interpretations_dir.exists():
            for lp_file in sorted(interpretations_dir.glob("*.lp")):
                ctl.load(str(lp_file))

        # Inject runtime configuration facts into the base program.
        ctl.add("base", [], config.to_asp_facts())

        if extra_facts.strip():
            ctl.add("scenario", [], extra_facts)
            ctl.ground([("base", []), ("scenario", [])])
        else:
            ctl.ground([("base", [])])

        return ctl

    def configure(
        self,
        world: Optional[str] = None,
        interpretation_precedence: Optional[List[str]] = None,
        safek_resolution: Optional[str] = None,
        context: Optional[str] = None,
        policy: Optional[Dict[str, str]] = None,
    ) -> "HsrsEngine":
        """
        Configure engine runtime settings.

        Args:
            world: Default world for queries
            interpretation_precedence: Ordered list of commentators
            safek_resolution: How to resolve doubt ("chumra", "kula", "madrega")
            context: Normative context ("lechatchila", "bediavad")
            policy: Policy overrides dict

        Returns:
            self (for method chaining)
        """
        if world is not None:
            self._config.world = world
        if interpretation_precedence is not None:
            self._config.interpretation_precedence = interpretation_precedence
        if safek_resolution is not None:
            self._config.safek_resolution = safek_resolution
        if context is not None:
            self._config.context = context
        if policy is not None:
            self._config.policy.update(policy)
        # Apply new runtime config to the solver.
        self._load_ontology()
        return self

    def ask(self, atom: str, world: Optional[str] = None) -> bool:
        """
        Boolean query - does this atom hold?

        Args:
            atom: Ground atom like "world(base)" or "forbidden(W, achiila, m1, ctx)"
            world: Optional world context (for future use with world-scoped queries)

        Returns:
            True if atom holds in any model, False otherwise
        """
        if not self._is_loaded:
            raise RuntimeError("Ontology not loaded")

        # World-aware queries are handled at the ASP level via holds/2.
        # At the engine level, ask() is a boolean existence check for a pattern.
        _ = world  # reserved for future world-scoped query sugar
        return len(self.query(atom)) > 0

    def _asprin_show_for_pattern(self, pattern: str) -> str:
        """Return a `#show` directive to minimize asprin output for a given pattern.

        asprin (like clingo) prints only shown atoms. For preferred queries we
        only need atoms of the predicate being matched. This keeps I/O bounded
        for large ontologies.
        """
        pattern = pattern.strip()
        if pattern == "_":
            return "#show.\n"

        from mistaber.engine.asp_pattern import parse_term

        term = parse_term(pattern)
        if term.kind == "fun":
            return f"#show {term.name}/{len(term.args)}.\n"
        if term.kind == "const":
            return f"#show {term.name}/0.\n"
        # Variables / wildcards match anything; show all atoms.
        return "#show.\n"

    def query(self, pattern: str, policy: Optional[Dict[str, str]] = None) -> List[Dict[str, Any]]:
        """
        Query for atoms matching a pattern.

        Args:
            pattern: ASP atom pattern like "world(X)" or "forbidden(W, A, F, C)"
            policy: Optional policy overrides dict (merges with config policy)

        Returns:
            List of dicts with variable bindings
        """
        if not self._is_loaded:
            raise RuntimeError("Ontology not loaded")

        import copy

        # If policy overrides are provided, evaluate against a temporary control
        # with the merged config injected as facts.
        ctl = self._ctl
        if policy:
            cfg = copy.deepcopy(self._config)
            cfg.policy.update(policy)
            ctl = self._build_control(config=cfg)

        assert ctl is not None
        with ctl.solve(yield_=True) as handle:
            for model in handle:
                atoms = [str(a) for a in model.symbols(atoms=True)]
                return query_atoms(pattern, atoms)
        return []

    def atoms(self) -> List[str]:
        """Return all atoms from the first stable model as strings."""
        if not self._is_loaded:
            raise RuntimeError("Ontology not loaded")

        assert self._ctl is not None
        with self._ctl.solve(yield_=True) as handle:
            for model in handle:
                return [str(a) for a in model.symbols(atoms=True)]
        return []

    def compare(
        self,
        pattern: str,
        worlds: Optional[List[str]] = None,
        policy: Optional[Dict[str, str]] = None,
    ) -> Dict[str, List[Dict[str, Any]]]:
        """
        Compare query results across multiple worlds.

        Args:
            pattern: ASP atom pattern
            worlds: List of worlds to compare (default: all worlds)
            policy: Optional policy overrides dict (merges with config policy)

        Returns:
            Dict mapping world name to list of results
        """
        if not self._is_loaded:
            raise RuntimeError("Ontology not loaded")

        import copy

        ctl = self._ctl
        if policy:
            cfg = copy.deepcopy(self._config)
            cfg.policy.update(policy)
            ctl = self._build_control(config=cfg)

        assert ctl is not None
        with ctl.solve(yield_=True) as handle:
            for model in handle:
                atoms = [str(a) for a in model.symbols(atoms=True)]
                return compare_atoms(pattern, atoms, worlds=worlds)
        return {}

    def analyze(
        self, scenario: str, world: Optional[str] = None, policy: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """
        Analyze a scenario (push mode).

        Args:
            scenario: ASP code describing the scenario
            world: World context for analysis (default: default_world)
            policy: Optional policy overrides dict (merges with config policy)

        Returns:
            Dict with 'world', 'atoms', and analysis results
        """
        if not self._is_loaded:
            raise RuntimeError("Ontology not loaded")

        import copy

        world = world or self.default_world

        cfg = copy.deepcopy(self._config)
        cfg.world = world
        if policy:
            cfg.policy.update(policy)

        scenario_ctl = self._build_control(config=cfg, extra_facts=scenario)

        with scenario_ctl.solve(yield_=True) as handle:
            for model in handle:
                atoms = [str(a) for a in model.symbols(atoms=True)]
                break
            else:
                atoms = []

        return {"world": world, "atoms": atoms, "scenario": scenario.strip()}

    def query_preferred(self, pattern: str, world: Optional[str] = None) -> List[Dict[str, Any]]:
        """
        Query with asprin preference optimization.

        Returns only the preferred (optimal) outcomes based on the preference
        ordering defined in preferences.lp:
        1. madrega strength (d_oraita > d_rabanan > minhag > chumra)
        2. safek handling (stringent for biblical, lenient for rabbinic)
        3. world priority (explicit priority declarations)
        4. rule specificity (more conditions = more specific = preferred)

        If asprin is not available, falls back to regular query.

        Args:
            pattern: ASP atom pattern like "world(X)" or "derives(W, Conclusion)"
            world: Optional world context for the query

        Returns:
            List of optimal results according to preference ordering.
            Each result is a dict with variable bindings from the pattern.
        """
        import copy
        import os
        import subprocess
        import tempfile

        from mistaber.engine.external_tools import find_tool

        if not self._is_loaded:
            raise RuntimeError("Ontology not loaded")

        world = world or self.default_world

        asprin_bin = find_tool("asprin")
        if asprin_bin is None:
            return self.query(pattern)

        cfg = copy.deepcopy(self._config)
        cfg.world = world

        # Build the file list using real paths to preserve #include resolution.
        schema_files = sorted((self.ontology_path / "schema").glob("*.lp"))
        base_files = sorted((self.ontology_path / "base").glob("*.lp"))
        world_files = sorted((self.ontology_path / "worlds").glob("*.lp"))

        engine_dir = self.ontology_path.parent / "engine"
        engine_files = [
            engine_dir / "safek.lp",
            engine_dir / "priorities.lp",
            engine_dir / "interpretations.lp",
            engine_dir / "policy.lp",
            engine_dir / "preferences.lp",
        ]

        interp_dir = self.ontology_path / "interpretations"
        interp_files = sorted(interp_dir.glob("*.lp")) if interp_dir.exists() else []

        # Inject runtime config and force showing all atoms so pattern matching
        # does not depend on existing #show directives in the ontology.
        with tempfile.NamedTemporaryFile(mode="w", suffix=".lp", delete=False) as f:
            f.write(cfg.to_asp_facts())
            f.write("\n")
            f.write(self._asprin_show_for_pattern(pattern))
            temp_path = f.name

        try:
            # Run asprin with optimization
            result = subprocess.run(
                [
                    asprin_bin,
                    "-q",
                    "1",  # print only optimal models
                    "-n",
                    "1",
                    *map(str, schema_files + base_files + world_files),
                    *[str(p) for p in engine_files if p.exists()],
                    *map(str, interp_files),
                    temp_path,
                ],
                capture_output=True,
                text=True,
                timeout=30,
            )

            if result.returncode not in [0, 10, 20, 30]:
                # asprin failed unexpectedly, fall back to regular query
                return self.query(pattern)

            # Parse asprin output
            return self._parse_asprin_output(result.stdout, pattern)

        except (subprocess.TimeoutExpired, FileNotFoundError):
            # Fall back to regular query if asprin fails
            return self.query(pattern)
        finally:
            try:
                os.unlink(temp_path)
            except (OSError, NameError):
                pass

    def _check_asprin_available(self) -> bool:
        """Check if asprin is available on the system.

        Returns:
            True if asprin can be executed, False otherwise.
        """
        import subprocess

        from mistaber.engine.external_tools import find_tool

        asprin_bin = find_tool("asprin")
        if asprin_bin is None:
            return False

        try:
            result = subprocess.run(
                [asprin_bin, "--version"], capture_output=True, text=True, timeout=5
            )
            return result.returncode == 0
        except (OSError, subprocess.TimeoutExpired):
            return False

    def _parse_asprin_output(self, output: str, pattern: str) -> List[Dict[str, Any]]:
        """Parse asprin output and extract matching atoms.

        asprin output format includes "Answer: N" lines followed by atom lists,
        and potentially "OPTIMUM FOUND" markers for optimal solutions.

        Args:
            output: Raw stdout from asprin
            pattern: ASP atom pattern to match

        Returns:
            List of dicts with variable bindings for matching atoms.
        """
        # Track which answer sets we've seen. asprin outputs one or more
        # improving models; the last "Answer:" block corresponds to the final
        # (optimal) model when using `-q 1`.
        answer_atoms: list[str] = []
        current_answer: list[str] = []
        in_answer = False

        for line in output.split("\n"):
            line = line.strip()
            if not line or line.startswith("%"):
                continue

            if line.startswith("Answer:"):
                if current_answer:
                    answer_atoms = current_answer
                current_answer = []
                in_answer = True
                continue

            # End markers.
            if line.startswith("OPTIMUM") or line.startswith("SATISFIABLE"):
                if current_answer:
                    answer_atoms = current_answer
                continue

            if in_answer:
                comment_pos = line.find("%")
                if comment_pos >= 0:
                    line = line[:comment_pos].strip()
                if line:
                    for tok in line.split():
                        # clingo/asprin can print solver statistics after the model.
                        # Answer-set atoms always start with a lowercase predicate name.
                        if tok and (tok[0].islower() or tok[0] == "_"):
                            current_answer.append(tok)

        if current_answer:
            answer_atoms = current_answer

        # Reuse the engine's clingo-free matcher for robust nested-term unification.
        return query_atoms(pattern, answer_atoms)

    def _parse_atom_args(self, args_str: str) -> List[str]:
        """Parse atom arguments, handling nested terms.

        Args:
            args_str: String of comma-separated arguments, e.g., "foo,bar(x,y),baz"

        Returns:
            List of argument strings.
        """
        args = []
        current = []
        paren_depth = 0

        for char in args_str:
            if char == "(":
                paren_depth += 1
                current.append(char)
            elif char == ")":
                paren_depth -= 1
                current.append(char)
            elif char == "," and paren_depth == 0:
                args.append("".join(current).strip())
                current = []
            else:
                current.append(char)

        # Don't forget the last argument
        if current:
            args.append("".join(current).strip())

        return args

    def explain(
        self, atom: str, world: Optional[str] = None, additional_facts: str = ""
    ) -> Dict[str, Any]:
        """
        Generate an explanation for why an atom holds using xclingo2.

        This method provides derivation tree explanations showing the reasoning
        chain that led to a conclusion. It uses xclingo2 when available, falling
        back to basic verification when xclingo is not installed.

        The explanation includes:
        - The derivation tree structure
        - Rule IDs that fired
        - Supporting facts and intermediate conclusions
        - Source citations (makor) when present in trace annotations

        Args:
            atom: The atom to explain (e.g., "world(base)", "forbidden(W,A,S,C)")
            world: World context for the explanation (default: default_world)
            additional_facts: Additional ASP facts to include in the program

        Returns:
            Dictionary containing:
            - atom: The queried atom
            - world: The world context
            - using_xclingo: Whether xclingo was used for the explanation
            - derivation: Dictionary representation of the derivation tree
            - tree: Human-readable text representation
            - raw_output: Raw xclingo output (if xclingo was used)

        Example:
            >>> engine = HsrsEngine(ontology_path)
            >>> result = engine.explain("world(base)", world="base")
            >>> print(result["tree"])
        """
        from mistaber.engine.xclingo_explain import XclingoExplainer

        world = world or self.default_world
        explainer = XclingoExplainer(self.ontology_path)
        return explainer.explain(atom, world, additional_facts)

    def _get_completeness_checker(self) -> CompletenessChecker:
        """Get or create the completeness checker instance.

        Lazily initializes the CompletenessChecker on first use to avoid
        circular import issues during engine initialization.

        Returns:
            CompletenessChecker instance
        """
        if self._completeness_checker is None:
            self._completeness_checker = CompletenessChecker(self)
        return self._completeness_checker

    def check_completeness(
        self, query: str, facts: Union[Set[str], List[str]]
    ) -> Tuple[bool, List[MissingFact]]:
        """Check if all required OWA predicates have known values for the query.

        In Open World Assumption (OWA), missing facts are unknown (not false).
        Before querying, this method verifies that all required OWA predicates
        have values for the relevant entities. If not, the query result would
        be uncertain (safek).

        This is essential for halachic reasoning where absence of information
        should lead to questions rather than assumptions.

        Args:
            query: ASP atom pattern (e.g., "is_kosher(chicken)")
            facts: Set or list of fact strings representing known facts
                (e.g., {"is_food(chicken).", "is_kosher(chicken)."})

        Returns:
            Tuple of (is_complete, missing_facts):
            - is_complete: True if all required OWA predicates have known values
            - missing_facts: List of MissingFact objects for unknown predicates

        Example:
            >>> engine = HsrsEngine(ontology_path)
            >>> facts = {"is_food(chicken).", "food_cat(chicken, basar)."}
            >>> is_complete, missing = engine.check_completeness(
            ...     "is_kosher(chicken)", facts
            ... )
            >>> if not is_complete:
            ...     for mf in missing:
            ...         print(f"Need to know: {mf.question}")
        """
        checker = self._get_completeness_checker()
        return checker.check_completeness(query, facts)

    def query_with_completeness(
        self,
        pattern: str,
        world: Optional[str] = None,
        require_complete: bool = False,
        additional_facts: Optional[Union[Set[str], List[str]]] = None,
    ) -> Dict[str, Any]:
        """Query with completeness checking for OWA predicates.

        Combines completeness checking with query execution. Before running
        the query, checks if all required OWA predicates have known values.
        Depending on require_complete, either proceeds with uncertain results
        or returns empty results with missing fact information.

        This method is the recommended way to query when dealing with OWA
        predicates, as it makes uncertainty explicit rather than hidden.

        Args:
            pattern: ASP atom pattern like "is_kosher(X)" or "forbidden(food)"
            world: World context for the query (default: default_world)
            require_complete: If True, return empty results when incomplete.
                If False, run query anyway but mark as incomplete.
            additional_facts: Optional set/list of additional facts to include
                in both completeness checking and the query itself.

        Returns:
            Dict containing:
            - complete: bool indicating if all OWA predicates have values
            - results: List of query results (empty if require_complete=True
                and incomplete)
            - missing: List of MissingFact objects for unknown predicates

        Example:
            >>> engine = HsrsEngine(ontology_path)
            >>> result = engine.query_with_completeness(
            ...     pattern="is_kosher(X)",
            ...     world="base",
            ...     require_complete=True,
            ...     additional_facts={"is_food(chicken).", "is_kosher(chicken)."}
            ... )
            >>> if result["complete"]:
            ...     print("Results:", result["results"])
            ... else:
            ...     print("Missing facts:")
            ...     for mf in result["missing"]:
            ...         print(f"  - {mf.question}")
        """
        if not self._is_loaded:
            raise RuntimeError("Ontology not loaded")

        world = world or self.default_world
        checker = self._get_completeness_checker()

        # Prepare facts for completeness check
        facts: Set[str] = set()
        if additional_facts:
            facts = (
                set(additional_facts) if not isinstance(additional_facts, set) else additional_facts
            )

        # Also extract known facts from the engine's current state
        engine_facts = checker._extract_known_facts()
        facts = facts.union(engine_facts)

        # Check completeness
        is_complete, missing = checker.check_completeness(pattern, facts)

        # If require_complete and not complete, return early without querying
        if require_complete and not is_complete:
            return {
                "complete": False,
                "results": [],
                "missing": missing,
                "world": world,
                "query": pattern,
            }

        # Run the query
        if additional_facts:
            # Create a scenario with the additional facts
            scenario = "\n".join(additional_facts)
            analysis = self.analyze(scenario, world)
            # Filter results from atoms to match pattern
            results = self._extract_pattern_matches(pattern, analysis["atoms"])
        else:
            # Use regular query
            results = self.query(pattern)

        return {
            "complete": is_complete,
            "results": results,
            "missing": missing,
            "world": world,
            "query": pattern,
        }

    def _extract_pattern_matches(self, pattern: str, atoms: List[str]) -> List[Dict[str, Any]]:
        """Extract variable bindings from atoms matching a pattern.

        Helper method for query_with_completeness when analyzing scenarios.

        Args:
            pattern: ASP atom pattern with variables (e.g., "is_kosher(X)")
            atoms: List of atom strings from solver

        Returns:
            List of dicts with variable bindings
        """
        if "(" not in pattern:
            return []

        pred_name = pattern.split("(")[0]
        args_str = pattern.split("(")[1].rstrip(")")
        pattern_args = [a.strip() for a in args_str.split(",")]

        results = []
        for atom_str in atoms:
            if atom_str.startswith(pred_name + "("):
                # Extract arguments from atom
                try:
                    atom_args_str = atom_str.split("(")[1].rstrip(")")
                    atom_args = self._parse_atom_args(atom_args_str)

                    binding = {}
                    for i, parg in enumerate(pattern_args):
                        if parg and parg[0].isupper() and i < len(atom_args):
                            binding[parg] = atom_args[i]

                    if binding:
                        results.append(binding)
                except (IndexError, ValueError):
                    continue

        return results

is_loaded: bool property

Return True if ontology is loaded.

__init__(ontology_path: Path, default_world: str = 'base')

Initialize the engine.

Parameters:

Name Type Description Default
ontology_path Path

Path to ontology directory

required
default_world str

Default world for queries

'base'
Source code in mistaber/engine/engine.py
def __init__(self, ontology_path: Path, default_world: str = "base"):
    """
    Initialize the engine.

    Args:
        ontology_path: Path to ontology directory
        default_world: Default world for queries
    """
    if clingo is None:
        raise ImportError(
            "clingo is required to use HsrsEngine. Install with: pip install clingo"
        )

    self.ontology_path = Path(ontology_path)
    self.default_world = default_world
    self._world_manager = WorldManager()
    self._ctl: Optional[clingo.Control] = None
    self._is_loaded = False
    self._completeness_checker: Optional[CompletenessChecker] = None
    self._config = EngineConfig(world=default_world)

    self._load_ontology()

configure(world: Optional[str] = None, interpretation_precedence: Optional[List[str]] = None, safek_resolution: Optional[str] = None, context: Optional[str] = None, policy: Optional[Dict[str, str]] = None) -> 'HsrsEngine'

Configure engine runtime settings.

Parameters:

Name Type Description Default
world Optional[str]

Default world for queries

None
interpretation_precedence Optional[List[str]]

Ordered list of commentators

None
safek_resolution Optional[str]

How to resolve doubt ("chumra", "kula", "madrega")

None
context Optional[str]

Normative context ("lechatchila", "bediavad")

None
policy Optional[Dict[str, str]]

Policy overrides dict

None

Returns:

Type Description
'HsrsEngine'

self (for method chaining)

Source code in mistaber/engine/engine.py
def configure(
    self,
    world: Optional[str] = None,
    interpretation_precedence: Optional[List[str]] = None,
    safek_resolution: Optional[str] = None,
    context: Optional[str] = None,
    policy: Optional[Dict[str, str]] = None,
) -> "HsrsEngine":
    """
    Configure engine runtime settings.

    Args:
        world: Default world for queries
        interpretation_precedence: Ordered list of commentators
        safek_resolution: How to resolve doubt ("chumra", "kula", "madrega")
        context: Normative context ("lechatchila", "bediavad")
        policy: Policy overrides dict

    Returns:
        self (for method chaining)
    """
    if world is not None:
        self._config.world = world
    if interpretation_precedence is not None:
        self._config.interpretation_precedence = interpretation_precedence
    if safek_resolution is not None:
        self._config.safek_resolution = safek_resolution
    if context is not None:
        self._config.context = context
    if policy is not None:
        self._config.policy.update(policy)
    # Apply new runtime config to the solver.
    self._load_ontology()
    return self

ask(atom: str, world: Optional[str] = None) -> bool

Boolean query - does this atom hold?

Parameters:

Name Type Description Default
atom str

Ground atom like "world(base)" or "forbidden(W, achiila, m1, ctx)"

required
world Optional[str]

Optional world context (for future use with world-scoped queries)

None

Returns:

Type Description
bool

True if atom holds in any model, False otherwise

Source code in mistaber/engine/engine.py
def ask(self, atom: str, world: Optional[str] = None) -> bool:
    """
    Boolean query - does this atom hold?

    Args:
        atom: Ground atom like "world(base)" or "forbidden(W, achiila, m1, ctx)"
        world: Optional world context (for future use with world-scoped queries)

    Returns:
        True if atom holds in any model, False otherwise
    """
    if not self._is_loaded:
        raise RuntimeError("Ontology not loaded")

    # World-aware queries are handled at the ASP level via holds/2.
    # At the engine level, ask() is a boolean existence check for a pattern.
    _ = world  # reserved for future world-scoped query sugar
    return len(self.query(atom)) > 0

query(pattern: str, policy: Optional[Dict[str, str]] = None) -> List[Dict[str, Any]]

Query for atoms matching a pattern.

Parameters:

Name Type Description Default
pattern str

ASP atom pattern like "world(X)" or "forbidden(W, A, F, C)"

required
policy Optional[Dict[str, str]]

Optional policy overrides dict (merges with config policy)

None

Returns:

Type Description
List[Dict[str, Any]]

List of dicts with variable bindings

Source code in mistaber/engine/engine.py
def query(self, pattern: str, policy: Optional[Dict[str, str]] = None) -> List[Dict[str, Any]]:
    """
    Query for atoms matching a pattern.

    Args:
        pattern: ASP atom pattern like "world(X)" or "forbidden(W, A, F, C)"
        policy: Optional policy overrides dict (merges with config policy)

    Returns:
        List of dicts with variable bindings
    """
    if not self._is_loaded:
        raise RuntimeError("Ontology not loaded")

    import copy

    # If policy overrides are provided, evaluate against a temporary control
    # with the merged config injected as facts.
    ctl = self._ctl
    if policy:
        cfg = copy.deepcopy(self._config)
        cfg.policy.update(policy)
        ctl = self._build_control(config=cfg)

    assert ctl is not None
    with ctl.solve(yield_=True) as handle:
        for model in handle:
            atoms = [str(a) for a in model.symbols(atoms=True)]
            return query_atoms(pattern, atoms)
    return []

compare(pattern: str, worlds: Optional[List[str]] = None, policy: Optional[Dict[str, str]] = None) -> Dict[str, List[Dict[str, Any]]]

Compare query results across multiple worlds.

Parameters:

Name Type Description Default
pattern str

ASP atom pattern

required
worlds Optional[List[str]]

List of worlds to compare (default: all worlds)

None
policy Optional[Dict[str, str]]

Optional policy overrides dict (merges with config policy)

None

Returns:

Type Description
Dict[str, List[Dict[str, Any]]]

Dict mapping world name to list of results

Source code in mistaber/engine/engine.py
def compare(
    self,
    pattern: str,
    worlds: Optional[List[str]] = None,
    policy: Optional[Dict[str, str]] = None,
) -> Dict[str, List[Dict[str, Any]]]:
    """
    Compare query results across multiple worlds.

    Args:
        pattern: ASP atom pattern
        worlds: List of worlds to compare (default: all worlds)
        policy: Optional policy overrides dict (merges with config policy)

    Returns:
        Dict mapping world name to list of results
    """
    if not self._is_loaded:
        raise RuntimeError("Ontology not loaded")

    import copy

    ctl = self._ctl
    if policy:
        cfg = copy.deepcopy(self._config)
        cfg.policy.update(policy)
        ctl = self._build_control(config=cfg)

    assert ctl is not None
    with ctl.solve(yield_=True) as handle:
        for model in handle:
            atoms = [str(a) for a in model.symbols(atoms=True)]
            return compare_atoms(pattern, atoms, worlds=worlds)
    return {}

analyze(scenario: str, world: Optional[str] = None, policy: Optional[Dict[str, str]] = None) -> Dict[str, Any]

Analyze a scenario (push mode).

Parameters:

Name Type Description Default
scenario str

ASP code describing the scenario

required
world Optional[str]

World context for analysis (default: default_world)

None
policy Optional[Dict[str, str]]

Optional policy overrides dict (merges with config policy)

None

Returns:

Type Description
Dict[str, Any]

Dict with 'world', 'atoms', and analysis results

Source code in mistaber/engine/engine.py
def analyze(
    self, scenario: str, world: Optional[str] = None, policy: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
    """
    Analyze a scenario (push mode).

    Args:
        scenario: ASP code describing the scenario
        world: World context for analysis (default: default_world)
        policy: Optional policy overrides dict (merges with config policy)

    Returns:
        Dict with 'world', 'atoms', and analysis results
    """
    if not self._is_loaded:
        raise RuntimeError("Ontology not loaded")

    import copy

    world = world or self.default_world

    cfg = copy.deepcopy(self._config)
    cfg.world = world
    if policy:
        cfg.policy.update(policy)

    scenario_ctl = self._build_control(config=cfg, extra_facts=scenario)

    with scenario_ctl.solve(yield_=True) as handle:
        for model in handle:
            atoms = [str(a) for a in model.symbols(atoms=True)]
            break
        else:
            atoms = []

    return {"world": world, "atoms": atoms, "scenario": scenario.strip()}

explain(atom: str, world: Optional[str] = None, additional_facts: str = '') -> Dict[str, Any]

Generate an explanation for why an atom holds using xclingo2.

This method provides derivation tree explanations showing the reasoning chain that led to a conclusion. It uses xclingo2 when available, falling back to basic verification when xclingo is not installed.

The explanation includes: - The derivation tree structure - Rule IDs that fired - Supporting facts and intermediate conclusions - Source citations (makor) when present in trace annotations

Parameters:

Name Type Description Default
atom str

The atom to explain (e.g., "world(base)", "forbidden(W,A,S,C)")

required
world Optional[str]

World context for the explanation (default: default_world)

None
additional_facts str

Additional ASP facts to include in the program

''

Returns:

Type Description
Dict[str, Any]

Dictionary containing:

Dict[str, Any]
  • atom: The queried atom
Dict[str, Any]
  • world: The world context
Dict[str, Any]
  • using_xclingo: Whether xclingo was used for the explanation
Dict[str, Any]
  • derivation: Dictionary representation of the derivation tree
Dict[str, Any]
  • tree: Human-readable text representation
Dict[str, Any]
  • raw_output: Raw xclingo output (if xclingo was used)
Example

engine = HsrsEngine(ontology_path) result = engine.explain("world(base)", world="base") print(result["tree"])

Source code in mistaber/engine/engine.py
def explain(
    self, atom: str, world: Optional[str] = None, additional_facts: str = ""
) -> Dict[str, Any]:
    """
    Generate an explanation for why an atom holds using xclingo2.

    This method provides derivation tree explanations showing the reasoning
    chain that led to a conclusion. It uses xclingo2 when available, falling
    back to basic verification when xclingo is not installed.

    The explanation includes:
    - The derivation tree structure
    - Rule IDs that fired
    - Supporting facts and intermediate conclusions
    - Source citations (makor) when present in trace annotations

    Args:
        atom: The atom to explain (e.g., "world(base)", "forbidden(W,A,S,C)")
        world: World context for the explanation (default: default_world)
        additional_facts: Additional ASP facts to include in the program

    Returns:
        Dictionary containing:
        - atom: The queried atom
        - world: The world context
        - using_xclingo: Whether xclingo was used for the explanation
        - derivation: Dictionary representation of the derivation tree
        - tree: Human-readable text representation
        - raw_output: Raw xclingo output (if xclingo was used)

    Example:
        >>> engine = HsrsEngine(ontology_path)
        >>> result = engine.explain("world(base)", world="base")
        >>> print(result["tree"])
    """
    from mistaber.engine.xclingo_explain import XclingoExplainer

    world = world or self.default_world
    explainer = XclingoExplainer(self.ontology_path)
    return explainer.explain(atom, world, additional_facts)

query_preferred(pattern: str, world: Optional[str] = None) -> List[Dict[str, Any]]

Query with asprin preference optimization.

Returns only the preferred (optimal) outcomes based on the preference ordering defined in preferences.lp: 1. madrega strength (d_oraita > d_rabanan > minhag > chumra) 2. safek handling (stringent for biblical, lenient for rabbinic) 3. world priority (explicit priority declarations) 4. rule specificity (more conditions = more specific = preferred)

If asprin is not available, falls back to regular query.

Parameters:

Name Type Description Default
pattern str

ASP atom pattern like "world(X)" or "derives(W, Conclusion)"

required
world Optional[str]

Optional world context for the query

None

Returns:

Type Description
List[Dict[str, Any]]

List of optimal results according to preference ordering.

List[Dict[str, Any]]

Each result is a dict with variable bindings from the pattern.

Source code in mistaber/engine/engine.py
def query_preferred(self, pattern: str, world: Optional[str] = None) -> List[Dict[str, Any]]:
    """
    Query with asprin preference optimization.

    Returns only the preferred (optimal) outcomes based on the preference
    ordering defined in preferences.lp:
    1. madrega strength (d_oraita > d_rabanan > minhag > chumra)
    2. safek handling (stringent for biblical, lenient for rabbinic)
    3. world priority (explicit priority declarations)
    4. rule specificity (more conditions = more specific = preferred)

    If asprin is not available, falls back to regular query.

    Args:
        pattern: ASP atom pattern like "world(X)" or "derives(W, Conclusion)"
        world: Optional world context for the query

    Returns:
        List of optimal results according to preference ordering.
        Each result is a dict with variable bindings from the pattern.
    """
    import copy
    import os
    import subprocess
    import tempfile

    from mistaber.engine.external_tools import find_tool

    if not self._is_loaded:
        raise RuntimeError("Ontology not loaded")

    world = world or self.default_world

    asprin_bin = find_tool("asprin")
    if asprin_bin is None:
        return self.query(pattern)

    cfg = copy.deepcopy(self._config)
    cfg.world = world

    # Build the file list using real paths to preserve #include resolution.
    schema_files = sorted((self.ontology_path / "schema").glob("*.lp"))
    base_files = sorted((self.ontology_path / "base").glob("*.lp"))
    world_files = sorted((self.ontology_path / "worlds").glob("*.lp"))

    engine_dir = self.ontology_path.parent / "engine"
    engine_files = [
        engine_dir / "safek.lp",
        engine_dir / "priorities.lp",
        engine_dir / "interpretations.lp",
        engine_dir / "policy.lp",
        engine_dir / "preferences.lp",
    ]

    interp_dir = self.ontology_path / "interpretations"
    interp_files = sorted(interp_dir.glob("*.lp")) if interp_dir.exists() else []

    # Inject runtime config and force showing all atoms so pattern matching
    # does not depend on existing #show directives in the ontology.
    with tempfile.NamedTemporaryFile(mode="w", suffix=".lp", delete=False) as f:
        f.write(cfg.to_asp_facts())
        f.write("\n")
        f.write(self._asprin_show_for_pattern(pattern))
        temp_path = f.name

    try:
        # Run asprin with optimization
        result = subprocess.run(
            [
                asprin_bin,
                "-q",
                "1",  # print only optimal models
                "-n",
                "1",
                *map(str, schema_files + base_files + world_files),
                *[str(p) for p in engine_files if p.exists()],
                *map(str, interp_files),
                temp_path,
            ],
            capture_output=True,
            text=True,
            timeout=30,
        )

        if result.returncode not in [0, 10, 20, 30]:
            # asprin failed unexpectedly, fall back to regular query
            return self.query(pattern)

        # Parse asprin output
        return self._parse_asprin_output(result.stdout, pattern)

    except (subprocess.TimeoutExpired, FileNotFoundError):
        # Fall back to regular query if asprin fails
        return self.query(pattern)
    finally:
        try:
            os.unlink(temp_path)
        except (OSError, NameError):
            pass

check_completeness(query: str, facts: Union[Set[str], List[str]]) -> Tuple[bool, List[MissingFact]]

Check if all required OWA predicates have known values for the query.

In Open World Assumption (OWA), missing facts are unknown (not false). Before querying, this method verifies that all required OWA predicates have values for the relevant entities. If not, the query result would be uncertain (safek).

This is essential for halachic reasoning where absence of information should lead to questions rather than assumptions.

Parameters:

Name Type Description Default
query str

ASP atom pattern (e.g., "is_kosher(chicken)")

required
facts Union[Set[str], List[str]]

Set or list of fact strings representing known facts (e.g., {"is_food(chicken).", "is_kosher(chicken)."})

required

Returns:

Type Description
bool

Tuple of (is_complete, missing_facts):

List[MissingFact]
  • is_complete: True if all required OWA predicates have known values
Tuple[bool, List[MissingFact]]
  • missing_facts: List of MissingFact objects for unknown predicates
Example

engine = HsrsEngine(ontology_path) facts = {"is_food(chicken).", "food_cat(chicken, basar)."} is_complete, missing = engine.check_completeness( ... "is_kosher(chicken)", facts ... ) if not is_complete: ... for mf in missing: ... print(f"Need to know: {mf.question}")

Source code in mistaber/engine/engine.py
def check_completeness(
    self, query: str, facts: Union[Set[str], List[str]]
) -> Tuple[bool, List[MissingFact]]:
    """Check if all required OWA predicates have known values for the query.

    In Open World Assumption (OWA), missing facts are unknown (not false).
    Before querying, this method verifies that all required OWA predicates
    have values for the relevant entities. If not, the query result would
    be uncertain (safek).

    This is essential for halachic reasoning where absence of information
    should lead to questions rather than assumptions.

    Args:
        query: ASP atom pattern (e.g., "is_kosher(chicken)")
        facts: Set or list of fact strings representing known facts
            (e.g., {"is_food(chicken).", "is_kosher(chicken)."})

    Returns:
        Tuple of (is_complete, missing_facts):
        - is_complete: True if all required OWA predicates have known values
        - missing_facts: List of MissingFact objects for unknown predicates

    Example:
        >>> engine = HsrsEngine(ontology_path)
        >>> facts = {"is_food(chicken).", "food_cat(chicken, basar)."}
        >>> is_complete, missing = engine.check_completeness(
        ...     "is_kosher(chicken)", facts
        ... )
        >>> if not is_complete:
        ...     for mf in missing:
        ...         print(f"Need to know: {mf.question}")
    """
    checker = self._get_completeness_checker()
    return checker.check_completeness(query, facts)

query_with_completeness(pattern: str, world: Optional[str] = None, require_complete: bool = False, additional_facts: Optional[Union[Set[str], List[str]]] = None) -> Dict[str, Any]

Query with completeness checking for OWA predicates.

Combines completeness checking with query execution. Before running the query, checks if all required OWA predicates have known values. Depending on require_complete, either proceeds with uncertain results or returns empty results with missing fact information.

This method is the recommended way to query when dealing with OWA predicates, as it makes uncertainty explicit rather than hidden.

Parameters:

Name Type Description Default
pattern str

ASP atom pattern like "is_kosher(X)" or "forbidden(food)"

required
world Optional[str]

World context for the query (default: default_world)

None
require_complete bool

If True, return empty results when incomplete. If False, run query anyway but mark as incomplete.

False
additional_facts Optional[Union[Set[str], List[str]]]

Optional set/list of additional facts to include in both completeness checking and the query itself.

None

Returns:

Type Description
Dict[str, Any]

Dict containing:

Dict[str, Any]
  • complete: bool indicating if all OWA predicates have values
Dict[str, Any]
  • results: List of query results (empty if require_complete=True and incomplete)
Dict[str, Any]
  • missing: List of MissingFact objects for unknown predicates
Example

engine = HsrsEngine(ontology_path) result = engine.query_with_completeness( ... pattern="is_kosher(X)", ... world="base", ... require_complete=True, ... additional_facts={"is_food(chicken).", "is_kosher(chicken)."} ... ) if result["complete"]: ... print("Results:", result["results"]) ... else: ... print("Missing facts:") ... for mf in result["missing"]: ... print(f" - {mf.question}")

Source code in mistaber/engine/engine.py
def query_with_completeness(
    self,
    pattern: str,
    world: Optional[str] = None,
    require_complete: bool = False,
    additional_facts: Optional[Union[Set[str], List[str]]] = None,
) -> Dict[str, Any]:
    """Query with completeness checking for OWA predicates.

    Combines completeness checking with query execution. Before running
    the query, checks if all required OWA predicates have known values.
    Depending on require_complete, either proceeds with uncertain results
    or returns empty results with missing fact information.

    This method is the recommended way to query when dealing with OWA
    predicates, as it makes uncertainty explicit rather than hidden.

    Args:
        pattern: ASP atom pattern like "is_kosher(X)" or "forbidden(food)"
        world: World context for the query (default: default_world)
        require_complete: If True, return empty results when incomplete.
            If False, run query anyway but mark as incomplete.
        additional_facts: Optional set/list of additional facts to include
            in both completeness checking and the query itself.

    Returns:
        Dict containing:
        - complete: bool indicating if all OWA predicates have values
        - results: List of query results (empty if require_complete=True
            and incomplete)
        - missing: List of MissingFact objects for unknown predicates

    Example:
        >>> engine = HsrsEngine(ontology_path)
        >>> result = engine.query_with_completeness(
        ...     pattern="is_kosher(X)",
        ...     world="base",
        ...     require_complete=True,
        ...     additional_facts={"is_food(chicken).", "is_kosher(chicken)."}
        ... )
        >>> if result["complete"]:
        ...     print("Results:", result["results"])
        ... else:
        ...     print("Missing facts:")
        ...     for mf in result["missing"]:
        ...         print(f"  - {mf.question}")
    """
    if not self._is_loaded:
        raise RuntimeError("Ontology not loaded")

    world = world or self.default_world
    checker = self._get_completeness_checker()

    # Prepare facts for completeness check
    facts: Set[str] = set()
    if additional_facts:
        facts = (
            set(additional_facts) if not isinstance(additional_facts, set) else additional_facts
        )

    # Also extract known facts from the engine's current state
    engine_facts = checker._extract_known_facts()
    facts = facts.union(engine_facts)

    # Check completeness
    is_complete, missing = checker.check_completeness(pattern, facts)

    # If require_complete and not complete, return early without querying
    if require_complete and not is_complete:
        return {
            "complete": False,
            "results": [],
            "missing": missing,
            "world": world,
            "query": pattern,
        }

    # Run the query
    if additional_facts:
        # Create a scenario with the additional facts
        scenario = "\n".join(additional_facts)
        analysis = self.analyze(scenario, world)
        # Filter results from atoms to match pattern
        results = self._extract_pattern_matches(pattern, analysis["atoms"])
    else:
        # Use regular query
        results = self.query(pattern)

    return {
        "complete": is_complete,
        "results": results,
        "missing": missing,
        "world": world,
        "query": pattern,
    }

Machloket Detection

MachloketDetector

mistaber.engine.machloket.detection.MachloketDetector

Detects disputes between halachic authorities (worlds).

Source code in mistaber/engine/machloket/detection.py
class MachloketDetector:
    """Detects disputes between halachic authorities (worlds)."""

    def __init__(self, engine: "HsrsEngine"):
        self.engine = engine

    def find_machloket(self, pattern: str) -> List[Machloket]:
        """
        Find all machloket for a given pattern across all worlds.

        Args:
            pattern: ASP pattern like "status(X, Y)"

        Returns:
            List of Machloket objects
        """
        # Get all worlds
        world_results = self.engine.query("world(W)")
        worlds = [r["W"] for r in world_results]

        return self.find_machloket_between(pattern, worlds)

    def find_machloket_between(self, pattern: str, worlds: List[str]) -> List[Machloket]:
        """
        Find machloket between specific worlds.

        Args:
            pattern: ASP pattern
            worlds: List of worlds to compare

        Returns:
            List of Machloket objects
        """
        # Collect conclusions per world
        conclusions: Dict[str, Dict[str, str]] = {}  # topic -> {world: value}

        for world in worlds:
            # Query holds/2 for this world
            results = self._query_world(pattern, world)
            for topic, value in results:
                if topic not in conclusions:
                    conclusions[topic] = {}
                conclusions[topic][world] = value

        # Find disputes (different conclusions for same topic)
        machloket_list = []
        for topic, opinions in conclusions.items():
            values = set(opinions.values())
            if len(values) > 1:
                # Dispute found
                machloket_list.append(Machloket(topic=topic, opinions=opinions))

        return machloket_list

    def _query_world(self, pattern: str, world: str) -> List[tuple]:
        """Query for pattern in specific world via holds/2."""
        results = []

        # Parse pattern to get predicate structure
        # e.g., "status(fish_dairy, X)" -> pred="status", args=["fish_dairy", "X"]
        pred_name = pattern.split("(")[0]
        args_str = pattern.split("(")[1].rstrip(")")
        args = [a.strip() for a in args_str.split(",")]

        # Find the topic (first non-variable arg) and value variable
        topic_idx = None
        value_idx = None
        for i, arg in enumerate(args):
            if arg[0].isupper():
                value_idx = i
            else:
                topic_idx = i

        # Query holds/2 atoms
        assert self.engine._ctl is not None
        with self.engine._ctl.solve(yield_=True) as handle:
            for model in handle:
                for atom in model.symbols(shown=True):
                    if atom.name == "holds" and len(atom.arguments) == 2:
                        prop = atom.arguments[0]
                        w = str(atom.arguments[1])

                        if w == world and prop.name == pred_name:
                            if topic_idx is not None and value_idx is not None:
                                topic = str(prop.arguments[topic_idx])
                                value = str(prop.arguments[value_idx])
                                results.append((topic, value))
                break

        return results

__init__(engine: HsrsEngine)

Source code in mistaber/engine/machloket/detection.py
def __init__(self, engine: "HsrsEngine"):
    self.engine = engine

find_machloket(pattern: str) -> List[Machloket]

Find all machloket for a given pattern across all worlds.

Parameters:

Name Type Description Default
pattern str

ASP pattern like "status(X, Y)"

required

Returns:

Type Description
List[Machloket]

List of Machloket objects

Source code in mistaber/engine/machloket/detection.py
def find_machloket(self, pattern: str) -> List[Machloket]:
    """
    Find all machloket for a given pattern across all worlds.

    Args:
        pattern: ASP pattern like "status(X, Y)"

    Returns:
        List of Machloket objects
    """
    # Get all worlds
    world_results = self.engine.query("world(W)")
    worlds = [r["W"] for r in world_results]

    return self.find_machloket_between(pattern, worlds)

find_machloket_between(pattern: str, worlds: List[str]) -> List[Machloket]

Find machloket between specific worlds.

Parameters:

Name Type Description Default
pattern str

ASP pattern

required
worlds List[str]

List of worlds to compare

required

Returns:

Type Description
List[Machloket]

List of Machloket objects

Source code in mistaber/engine/machloket/detection.py
def find_machloket_between(self, pattern: str, worlds: List[str]) -> List[Machloket]:
    """
    Find machloket between specific worlds.

    Args:
        pattern: ASP pattern
        worlds: List of worlds to compare

    Returns:
        List of Machloket objects
    """
    # Collect conclusions per world
    conclusions: Dict[str, Dict[str, str]] = {}  # topic -> {world: value}

    for world in worlds:
        # Query holds/2 for this world
        results = self._query_world(pattern, world)
        for topic, value in results:
            if topic not in conclusions:
                conclusions[topic] = {}
            conclusions[topic][world] = value

    # Find disputes (different conclusions for same topic)
    machloket_list = []
    for topic, opinions in conclusions.items():
        values = set(opinions.values())
        if len(values) > 1:
            # Dispute found
            machloket_list.append(Machloket(topic=topic, opinions=opinions))

    return machloket_list

Machloket

mistaber.engine.machloket.detection.Machloket dataclass

Represents a halachic dispute between authorities.

Source code in mistaber/engine/machloket/detection.py
@dataclass
class Machloket:
    """Represents a halachic dispute between authorities."""

    topic: str
    opinions: Dict[str, str]  # world -> conclusion
    sources: Dict[str, str] = field(default_factory=dict)  # world -> makor

DSL Compiler

compile_hll

mistaber.dsl.compiler.compile_hll(source: str, return_warnings: bool = False) -> Union[str, tuple[str, List[TypeCheckError]]]

Compile HLL source to ASP.

This function implements the complete compilation pipeline: 1. Parse HLL source into AST 2. Normalize surface syntax to canonical predicates 3. Type check the normalized AST (stops on errors, continues with warnings) 4. Emit ASP code

Parameters:

Name Type Description Default
source str

HLL source code

required
return_warnings bool

If True, return (asp, warnings) tuple

False

Returns:

Type Description
Union[str, tuple[str, List[TypeCheckError]]]

ASP source code, or (asp, warnings) if return_warnings=True

Raises:

Type Description
CompileError

If parsing or type checking fails with errors

Source code in mistaber/dsl/compiler/compiler.py
def compile_hll(
    source: str, return_warnings: bool = False
) -> Union[str, tuple[str, List[TypeCheckError]]]:
    """
    Compile HLL source to ASP.

    This function implements the complete compilation pipeline:
    1. Parse HLL source into AST
    2. Normalize surface syntax to canonical predicates
    3. Type check the normalized AST (stops on errors, continues with warnings)
    4. Emit ASP code

    Args:
        source: HLL source code
        return_warnings: If True, return (asp, warnings) tuple

    Returns:
        ASP source code, or (asp, warnings) if return_warnings=True

    Raises:
        CompileError: If parsing or type checking fails with errors
    """
    # Parse
    parser = HLLParser()
    try:
        ast = parser.parse(source)
    except ParseError as e:
        raise CompileError(f"Parse error: {e}")

    # Normalize (must happen before type checking to expand surface syntax)
    normalizer = Normalizer()
    try:
        normalized_ast = normalizer.normalize(ast)
    except NormalizationError as e:
        raise CompileError(f"Normalization error: {e}")

    # Type check (after normalization, on canonical predicates)
    checker = TypeChecker()
    errors = checker.check(normalized_ast)

    # Separate errors from warnings
    hard_errors = [e for e in errors if e.severity == "error"]
    warnings = [e for e in errors if e.severity == "warning"]

    if hard_errors:
        messages = [e.message for e in hard_errors]
        raise CompileError(f"Type errors: {'; '.join(messages)}")

    # Emit ASP
    emitter = ASPEmitter()
    try:
        asp = emitter.emit(normalized_ast)
    except EmitterError as e:
        raise CompileError(f"Emission error: {e}")

    if return_warnings:
        return asp, warnings
    return asp

compile_hll_file

mistaber.dsl.compiler.compile_hll_file(path: Path, return_warnings: bool = False) -> Union[str, tuple[str, List[TypeCheckError]]]

Compile an HLL file to ASP.

Parameters:

Name Type Description Default
path Path

Path to HLL source file

required
return_warnings bool

If True, return (asp, warnings) tuple

False

Returns:

Type Description
Union[str, tuple[str, List[TypeCheckError]]]

ASP source code, or (asp, warnings) if return_warnings=True

Raises:

Type Description
CompileError

If parsing, normalization, type checking, or emission fails

FileNotFoundError

If the file does not exist

PermissionError

If the file cannot be read due to permissions

IsADirectoryError

If the path is a directory instead of a file

OSError

If other I/O errors occur

Source code in mistaber/dsl/compiler/compiler.py
def compile_hll_file(
    path: Path, return_warnings: bool = False
) -> Union[str, tuple[str, List[TypeCheckError]]]:
    """
    Compile an HLL file to ASP.

    Args:
        path: Path to HLL source file
        return_warnings: If True, return (asp, warnings) tuple

    Returns:
        ASP source code, or (asp, warnings) if return_warnings=True

    Raises:
        CompileError: If parsing, normalization, type checking, or emission fails
        FileNotFoundError: If the file does not exist
        PermissionError: If the file cannot be read due to permissions
        IsADirectoryError: If the path is a directory instead of a file
        OSError: If other I/O errors occur
    """
    with open(path, encoding="utf-8") as f:
        source = f.read()
    return compile_hll(source, return_warnings=return_warnings)

CompileError

mistaber.dsl.compiler.CompileError

Bases: Exception

Raised when compilation fails.

Source code in mistaber/dsl/compiler/compiler.py
class CompileError(Exception):
    """Raised when compilation fails."""

    pass

Completeness Checking

CompletenessChecker

mistaber.engine.completeness.CompletenessChecker

Checks if all required OWA predicates have known values before querying.

In halachic reasoning, many predicates use Open World Assumption (OWA) - the absence of a fact means "unknown" rather than "false". Before we can make definitive conclusions, we need to ensure all relevant OWA predicates have known values.

This class: 1. Loads predicate definitions from the vocabulary registry 2. Identifies which predicates are OWA vs CWA 3. Analyzes queries to determine which predicates are required 4. Checks if all required OWA predicates have known values 5. Generates human-readable questions for missing facts

Attributes:

Name Type Description
OWA_PREDICATES Dict[str, Dict[str, Any]]

Class-level dict mapping OWA predicate names to their signatures and question templates.

Source code in mistaber/engine/completeness.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
class CompletenessChecker:
    """Checks if all required OWA predicates have known values before querying.

    In halachic reasoning, many predicates use Open World Assumption (OWA) -
    the absence of a fact means "unknown" rather than "false". Before we can
    make definitive conclusions, we need to ensure all relevant OWA predicates
    have known values.

    This class:
    1. Loads predicate definitions from the vocabulary registry
    2. Identifies which predicates are OWA vs CWA
    3. Analyzes queries to determine which predicates are required
    4. Checks if all required OWA predicates have known values
    5. Generates human-readable questions for missing facts

    Attributes:
        OWA_PREDICATES: Class-level dict mapping OWA predicate names to their
            signatures and question templates.
    """

    # OWA predicates from base.yaml with signatures and question templates
    # These are predicates where cwa: false in the vocabulary
    OWA_PREDICATES: Dict[str, Dict[str, Any]] = {
        # Kosher status predicates
        "is_kosher": {
            "signature": ["food"],
            "question_template": "Is {0} inherently kosher?",
            "hebrew": "כשר",
        },
        "is_treif": {
            "signature": ["food"],
            "question_template": "Is {0} inherently non-kosher (treif)?",
            "hebrew": "טריף",
        },
        # Issur predicates
        "issur_achiila": {
            "signature": ["food"],
            "question_template": "Does {0} have an eating prohibition?",
            "hebrew": "איסור_אכילה",
        },
        "issur_hanaah": {
            "signature": ["food"],
            "question_template": "Does {0} have a benefit prohibition?",
            "hebrew": "איסור_הנאה",
        },
        # Madrega predicates
        "is_d_oraita": {
            "signature": ["food"],
            "question_template": "Is {0} subject to Biblical-level prohibition?",
            "hebrew": "דאורייתא",
        },
        "is_d_rabanan": {
            "signature": ["food"],
            "question_template": "Is {0} subject to Rabbinic-level prohibition?",
            "hebrew": "דרבנן",
        },
        "is_minhag": {
            "signature": ["food"],
            "question_template": "Is {0} subject to customary practice?",
            "hebrew": "מנהג",
        },
        "is_chumra": {
            "signature": ["food"],
            "question_template": "Is {0} subject to stringency beyond requirement?",
            "hebrew": "חומרה",
        },
        # Status modalities
        "status": {
            "signature": ["food", "status_type"],
            "question_template": "What is the normative status of {0}?",
            "hebrew": "סטטוס",
            "derived": True,
        },
        "assur": {
            "signature": ["food"],
            "question_template": "Is {0} prohibited?",
            "hebrew": "אסור",
        },
        "mutar": {
            "signature": ["food"],
            "question_template": "Is {0} permitted?",
            "hebrew": "מותר",
        },
        "chiyuv": {
            "signature": ["food"],
            "question_template": "Is {0} obligatory?",
            "hebrew": "חיוב",
        },
        "reshut": {
            "signature": ["food"],
            "question_template": "Is {0} optional?",
            "hebrew": "רשות",
        },
        "mitzvah": {
            "signature": ["food"],
            "question_template": "Is {0} a meritorious action?",
            "hebrew": "מצוה",
        },
        "sakanah": {
            "signature": ["food"],
            "question_template": "Is {0} dangerous (medical/safety)?",
            "hebrew": "סכנה",
        },
        # Stringency level
        "lechatchila": {
            "signature": ["food"],
            "question_template": "Is {0} permissible ab initio (lechatchila)?",
            "hebrew": "לכתחילה",
        },
        "bediavad": {
            "signature": ["food"],
            "question_template": "Is {0} permissible post facto (bediavad)?",
            "hebrew": "בדיעבד",
        },
        # Temporal predicates
        "before": {
            "signature": ["time_point", "time_point"],
            "question_template": "Is {0} before {1}?",
            "hebrew": "לפני",
        },
        "at_time": {
            "signature": ["food", "time_point"],
            "question_template": "What is the state of {0} at time {1}?",
            "hebrew": "בזמן",
        },
        # Vessel predicates
        "kli_rishon": {
            "signature": ["vessel"],
            "question_template": "Is {0} a primary cooking vessel (kli rishon)?",
            "hebrew": "כלי_ראשון",
        },
        "kli_sheni": {
            "signature": ["vessel"],
            "question_template": "Is {0} a secondary vessel (kli sheni)?",
            "hebrew": "כלי_שני",
        },
        "kli_shlishi": {
            "signature": ["vessel"],
            "question_template": "Is {0} a tertiary vessel (kli shlishi)?",
            "hebrew": "כלי_שלישי",
        },
        "absorbed_in_vessel": {
            "signature": ["food", "vessel"],
            "question_template": "Has {0} been absorbed into vessel {1}?",
            "hebrew": "נבלע_בכלי",
        },
        # Temperature status
        "is_tzonen": {
            "signature": ["food"],
            "question_template": "Is {0} cold/room temperature?",
            "hebrew": "צונן",
        },
        "is_cham": {
            "signature": ["food"],
            "question_template": "Is {0} hot (yad soledet bo)?",
            "hebrew": "חם",
        },
        "is_roteiach": {
            "signature": ["food"],
            "question_template": "Is {0} boiling/actively cooking?",
            "hebrew": "רותח",
        },
        # Shiur predicates
        "noten_taam": {
            "signature": ["food", "food"],
            "question_template": "Does {0} impart taste to {1}?",
            "hebrew": "נותן_טעם",
        },
        # World structure
        "accessible": {
            "signature": ["world", "world"],
            "question_template": "Is world {1} accessible from world {0}?",
            "hebrew": "נגיש",
        },
        "holds_in": {
            "signature": ["food", "world"],
            "question_template": "Does the property hold for {0} in world {1}?",
            "hebrew": "תקף_ב",
        },
        # Derived conclusions (OWA)
        "permitted": {
            "signature": ["food"],
            "question_template": "Is {0} permitted for consumption?",
            "hebrew": "מותר_לאכול",
            "derived": True,
        },
        "forbidden": {
            "signature": ["food"],
            "question_template": "Is {0} forbidden for consumption?",
            "hebrew": "אסור_לאכול",
            "derived": True,
        },
        "requires_bedika": {
            "signature": ["food"],
            "question_template": "Does {0} require inspection?",
            "hebrew": "טעון_בדיקה",
            "derived": True,
        },
        "requires_sheilah": {
            "signature": ["issue"],
            "question_template": "Does {0} require consultation with authority?",
            "hebrew": "טעון_שאלה",
            "derived": True,
        },
        # Safek predicates
        "safek": {
            "signature": ["food"],
            "question_template": "Is the status of {0} doubtful?",
            "hebrew": "ספק",
        },
        "safek_d_oraita": {
            "signature": ["food"],
            "question_template": "Is {0} subject to Biblical doubt?",
            "hebrew": "ספק_דאורייתא",
        },
        "safek_d_rabanan": {
            "signature": ["food"],
            "question_template": "Is {0} subject to Rabbinic doubt?",
            "hebrew": "ספק_דרבנן",
        },
        "sfek_sfeika": {
            "signature": ["food"],
            "question_template": "Is {0} subject to double doubt?",
            "hebrew": "ספק_ספיקא",
        },
    }

    # CWA predicates - these don't need completeness checking
    # (absence = false in closed world)
    CWA_PREDICATES: Set[str] = {
        "is_food",
        "is_vessel",
        "is_mixture",
        "is_agent",
        "is_location",
        "food_cat",
        "food_type",
        "min_beheima",
        "min_chaya",
        "min_of",
        "min_dag",
        "is_mashkeh",
        "is_tavlin",
        "issur_type",
        "madrega",
        "stringency",
        "context_type",
        "during",
        "kli_status",
        "in_vessel",
        "temp_status",
        "shiur_type",
        "has_shiur",
        "ratio",
        "action_type",
        "performed",
        "achiila",
        "bishul",
        "hanaah",
        "mixed_with",
        "actual_world",
        "policy_setting",
        "follows_shitah",
        "makor",
        "cites",
        "world",
    }

    def __init__(self, engine: Any):
        """Initialize the completeness checker.

        Args:
            engine: HsrsEngine instance for accessing ontology state
        """
        self._engine = engine
        self._registry: Optional[Any] = None
        self._load_predicate_registry()

    def _load_predicate_registry(self) -> None:
        """Load predicate definitions from dsl/vocabulary/base.yaml if available.

        This method attempts to load the PredicateRegistry from the vocabulary
        file to get authoritative definitions. If unavailable, falls back to
        the hardcoded OWA_PREDICATES dictionary.
        """
        if not YAML_AVAILABLE:
            return

        # Try to load from the vocabulary registry
        try:
            from mistaber.dsl.vocabulary.registry import PredicateRegistry

            # Determine the base.yaml path from engine's ontology_path
            base_yaml = self._engine.ontology_path.parent / "dsl" / "vocabulary" / "base.yaml"

            if base_yaml.exists():
                self._registry = PredicateRegistry.load(base_yaml)
        except (ImportError, FileNotFoundError, Exception):
            # Fall back to hardcoded predicates
            pass

    def check_completeness(
        self, query: str, facts: Union[Set[str], List[str]]
    ) -> Tuple[bool, List[MissingFact]]:
        """Check if all required OWA predicates have known values for the query.

        Analyzes the query to determine which predicates and entities are involved,
        then checks if all OWA predicates have known values (either positive or
        negated assertions).

        Args:
            query: ASP atom pattern (e.g., "is_kosher(chicken)")
            facts: Set or list of fact strings (e.g., {"is_food(chicken)."})

        Returns:
            Tuple of (is_complete, missing_facts):
            - is_complete: True if all required OWA predicates have values
            - missing_facts: List of MissingFact objects for unknown predicates
        """
        if not query or not query.strip():
            # Empty query is trivially complete
            return (True, [])

        # Convert facts to set for efficient lookup
        facts_set = set(facts) if not isinstance(facts, set) else facts

        # Parse the query to extract predicate and entities
        try:
            required_predicates = self._analyze_required_predicates(query)
        except (ValueError, IndexError):
            # Malformed query - treat as complete (can't analyze)
            return (True, [])

        missing: List[MissingFact] = []

        # Extract entities from facts to know what we need to check
        entities_in_query = self._extract_entities_from_query(query)
        entities_in_facts = self._extract_entities_from_facts(facts_set)

        # For queries with variables (X, Y, etc.), check all known entities
        query_has_variable = any(
            c.isupper() and (i == 0 or not query[i - 1].isalpha())
            for i, c in enumerate(query)
            if c.isalpha()
        )

        if query_has_variable:
            # Check entities based on what's declared in facts
            entities_to_check = entities_in_facts
        else:
            entities_to_check = entities_in_query

        # Check each required OWA predicate
        for pred_name in required_predicates:
            if pred_name in self.CWA_PREDICATES:
                # CWA predicates don't need completeness checking
                continue

            if pred_name not in self.OWA_PREDICATES:
                # Unknown predicate - skip (may be user-defined CWA)
                continue

            pred_info = self.OWA_PREDICATES[pred_name]

            # Check if this is a derived predicate
            is_derived = pred_info.get("derived", False)

            # For derived predicates, check if base facts are present
            if is_derived:
                # Derived predicates are complete if their base predicates are complete
                # This is a simplification - full analysis would trace derivation chains
                continue

            # Check if we have values for all relevant entities
            for entity in entities_to_check:
                if not self._fact_is_known(pred_name, entity, facts_set):
                    # Generate question for this missing fact
                    question = self._generate_question(pred_name, entity, pred_info)
                    missing.append(
                        MissingFact(
                            predicate=pred_name, entity=entity, question=question, is_owa=True
                        )
                    )

        is_complete = len(missing) == 0
        return (is_complete, missing)

    def _analyze_required_predicates(self, query: str) -> Set[str]:
        """Analyze query to determine which predicates are required.

        Extracts predicate names from the query pattern. Currently handles
        simple atoms like "pred(args)" and nested terms.

        Args:
            query: ASP atom pattern

        Returns:
            Set of predicate names that appear in the query
        """
        predicates: Set[str] = set()

        if not query:
            return predicates

        # Extract predicate name from pattern like "pred(args)"
        # Handle nested predicates as well

        # Match predicate names followed by opening paren
        pred_pattern = re.compile(r"([a-z_][a-z0-9_]*)\s*\(")
        matches = pred_pattern.findall(query)

        for match in matches:
            predicates.add(match)

        # Also add the main predicate if it doesn't have parens
        if "(" not in query and query.strip():
            predicates.add(query.strip())

        return predicates

    def _extract_entities_from_query(self, query: str) -> Set[str]:
        """Extract entity arguments from query pattern.

        Args:
            query: ASP atom pattern like "is_kosher(chicken)"

        Returns:
            Set of entity names (constants, not variables)
        """
        entities: Set[str] = set()

        if "(" not in query or ")" not in query:
            return entities

        # Extract arguments between parens
        try:
            paren_start = query.index("(")
            paren_end = query.rindex(")")
            args_str = query[paren_start + 1 : paren_end]

            # Parse arguments, handling nested terms
            args = self._parse_args(args_str)

            for arg in args:
                arg = arg.strip()
                # Skip variables (start with uppercase) and numbers
                if arg and not arg[0].isupper() and not arg[0].isdigit():
                    # It's a constant - extract the base entity name
                    if "(" in arg:
                        # Nested term - extract the predicate name
                        arg = arg.split("(")[0]
                    entities.add(arg)
        except (ValueError, IndexError):
            pass

        return entities

    def _extract_entities_from_facts(self, facts: Set[str]) -> Set[str]:
        """Extract all entities mentioned in facts.

        Looks for is_food declarations and other entity-declaring facts.

        Args:
            facts: Set of fact strings

        Returns:
            Set of entity names
        """
        entities: Set[str] = set()

        # Patterns for entity-declaring predicates
        entity_declaring_preds = ["is_food", "is_vessel", "is_mixture", "is_agent", "is_location"]

        for fact in facts:
            fact = fact.strip().rstrip(".")
            for pred in entity_declaring_preds:
                if fact.startswith(f"{pred}("):
                    try:
                        paren_start = fact.index("(")
                        paren_end = fact.rindex(")")
                        entity = fact[paren_start + 1 : paren_end].strip()
                        if entity and not entity[0].isupper():
                            entities.add(entity)
                    except (ValueError, IndexError):
                        pass

        return entities

    def _parse_args(self, args_str: str) -> List[str]:
        """Parse comma-separated arguments handling nested terms.

        Args:
            args_str: String like "arg1, arg2, nested(a,b)"

        Returns:
            List of argument strings
        """
        args = []
        current = []
        paren_depth = 0

        for char in args_str:
            if char == "(":
                paren_depth += 1
                current.append(char)
            elif char == ")":
                paren_depth -= 1
                current.append(char)
            elif char == "," and paren_depth == 0:
                args.append("".join(current).strip())
                current = []
            else:
                current.append(char)

        if current:
            args.append("".join(current).strip())

        return args

    def _fact_is_known(self, pred_name: str, entity: str, facts: Set[str]) -> bool:
        """Check if a fact is known (either positive or negated assertion).

        Args:
            pred_name: Predicate name to check
            entity: Entity to check
            facts: Set of known facts

        Returns:
            True if the fact or its negation is present in facts
        """
        # Check for positive assertion
        positive_patterns = [
            f"{pred_name}({entity}).",
            f"{pred_name}({entity})",
            f"{pred_name}( {entity} ).",
            f"{pred_name}( {entity} )",
        ]

        # Check for negated assertion
        negated_patterns = [
            f"-{pred_name}({entity}).",
            f"-{pred_name}({entity})",
            f"-{pred_name}( {entity} ).",
            f"-{pred_name}( {entity} )",
            f"not {pred_name}({entity}).",
            f"not {pred_name}({entity})",
        ]

        for pattern in positive_patterns + negated_patterns:
            if pattern in facts:
                return True

        # Also check with different whitespace
        for fact in facts:
            fact_clean = fact.strip().rstrip(".")
            # Remove potential leading negation for comparison
            fact_for_check = fact_clean.lstrip("-").strip()
            if fact_for_check.startswith("not "):
                fact_for_check = fact_for_check[4:].strip()

            # Check if this fact is about our predicate and entity
            if fact_for_check.startswith(f"{pred_name}("):
                try:
                    paren_start = fact_for_check.index("(")
                    paren_end = fact_for_check.rindex(")")
                    args_str = fact_for_check[paren_start + 1 : paren_end]
                    args = self._parse_args(args_str)
                    if args and args[0].strip() == entity:
                        return True
                except (ValueError, IndexError):
                    continue

        return False

    def _generate_question(self, pred_name: str, entity: str, pred_info: Dict[str, Any]) -> str:
        """Generate a human-readable question for a missing fact.

        Args:
            pred_name: Predicate name
            entity: Entity argument
            pred_info: Predicate info from OWA_PREDICATES

        Returns:
            Human-readable question string
        """
        template: str = pred_info.get("question_template", "What is {0} for {1}?")

        # Format template with entity
        try:
            question: str = template.format(entity, entity)
        except (IndexError, KeyError):
            question = f"What is the value of {pred_name}({entity})?"

        return question

    def get_questions(self, query: str, facts: Union[Set[str], List[str]]) -> List[str]:
        """Get human-readable questions for missing facts.

        Convenience method that returns just the question strings from
        check_completeness.

        Args:
            query: ASP atom pattern
            facts: Set or list of fact strings

        Returns:
            List of question strings for missing facts
        """
        is_complete, missing = self.check_completeness(query, facts)
        return [mf.question for mf in missing]

    def _extract_known_facts(self) -> Set[str]:
        """Extract facts from the engine's current state.

        Queries the engine's clingo control to get all currently known atoms.

        Returns:
            Set of fact strings
        """
        facts: Set[str] = set()

        if not self._engine._is_loaded or self._engine._ctl is None:
            return facts

        try:
            with self._engine._ctl.solve(yield_=True) as handle:
                for model in handle:
                    for atom in model.symbols(shown=True):
                        facts.add(f"{atom}.")
                    break  # Only need first model
        except Exception:
            pass

        return facts

__init__(engine: Any)

Initialize the completeness checker.

Parameters:

Name Type Description Default
engine Any

HsrsEngine instance for accessing ontology state

required
Source code in mistaber/engine/completeness.py
def __init__(self, engine: Any):
    """Initialize the completeness checker.

    Args:
        engine: HsrsEngine instance for accessing ontology state
    """
    self._engine = engine
    self._registry: Optional[Any] = None
    self._load_predicate_registry()

check_completeness(query: str, facts: Union[Set[str], List[str]]) -> Tuple[bool, List[MissingFact]]

Check if all required OWA predicates have known values for the query.

Analyzes the query to determine which predicates and entities are involved, then checks if all OWA predicates have known values (either positive or negated assertions).

Parameters:

Name Type Description Default
query str

ASP atom pattern (e.g., "is_kosher(chicken)")

required
facts Union[Set[str], List[str]]

Set or list of fact strings (e.g., {"is_food(chicken)."})

required

Returns:

Type Description
bool

Tuple of (is_complete, missing_facts):

List[MissingFact]
  • is_complete: True if all required OWA predicates have values
Tuple[bool, List[MissingFact]]
  • missing_facts: List of MissingFact objects for unknown predicates
Source code in mistaber/engine/completeness.py
def check_completeness(
    self, query: str, facts: Union[Set[str], List[str]]
) -> Tuple[bool, List[MissingFact]]:
    """Check if all required OWA predicates have known values for the query.

    Analyzes the query to determine which predicates and entities are involved,
    then checks if all OWA predicates have known values (either positive or
    negated assertions).

    Args:
        query: ASP atom pattern (e.g., "is_kosher(chicken)")
        facts: Set or list of fact strings (e.g., {"is_food(chicken)."})

    Returns:
        Tuple of (is_complete, missing_facts):
        - is_complete: True if all required OWA predicates have values
        - missing_facts: List of MissingFact objects for unknown predicates
    """
    if not query or not query.strip():
        # Empty query is trivially complete
        return (True, [])

    # Convert facts to set for efficient lookup
    facts_set = set(facts) if not isinstance(facts, set) else facts

    # Parse the query to extract predicate and entities
    try:
        required_predicates = self._analyze_required_predicates(query)
    except (ValueError, IndexError):
        # Malformed query - treat as complete (can't analyze)
        return (True, [])

    missing: List[MissingFact] = []

    # Extract entities from facts to know what we need to check
    entities_in_query = self._extract_entities_from_query(query)
    entities_in_facts = self._extract_entities_from_facts(facts_set)

    # For queries with variables (X, Y, etc.), check all known entities
    query_has_variable = any(
        c.isupper() and (i == 0 or not query[i - 1].isalpha())
        for i, c in enumerate(query)
        if c.isalpha()
    )

    if query_has_variable:
        # Check entities based on what's declared in facts
        entities_to_check = entities_in_facts
    else:
        entities_to_check = entities_in_query

    # Check each required OWA predicate
    for pred_name in required_predicates:
        if pred_name in self.CWA_PREDICATES:
            # CWA predicates don't need completeness checking
            continue

        if pred_name not in self.OWA_PREDICATES:
            # Unknown predicate - skip (may be user-defined CWA)
            continue

        pred_info = self.OWA_PREDICATES[pred_name]

        # Check if this is a derived predicate
        is_derived = pred_info.get("derived", False)

        # For derived predicates, check if base facts are present
        if is_derived:
            # Derived predicates are complete if their base predicates are complete
            # This is a simplification - full analysis would trace derivation chains
            continue

        # Check if we have values for all relevant entities
        for entity in entities_to_check:
            if not self._fact_is_known(pred_name, entity, facts_set):
                # Generate question for this missing fact
                question = self._generate_question(pred_name, entity, pred_info)
                missing.append(
                    MissingFact(
                        predicate=pred_name, entity=entity, question=question, is_owa=True
                    )
                )

    is_complete = len(missing) == 0
    return (is_complete, missing)

get_questions(query: str, facts: Union[Set[str], List[str]]) -> List[str]

Get human-readable questions for missing facts.

Convenience method that returns just the question strings from check_completeness.

Parameters:

Name Type Description Default
query str

ASP atom pattern

required
facts Union[Set[str], List[str]]

Set or list of fact strings

required

Returns:

Type Description
List[str]

List of question strings for missing facts

Source code in mistaber/engine/completeness.py
def get_questions(self, query: str, facts: Union[Set[str], List[str]]) -> List[str]:
    """Get human-readable questions for missing facts.

    Convenience method that returns just the question strings from
    check_completeness.

    Args:
        query: ASP atom pattern
        facts: Set or list of fact strings

    Returns:
        List of question strings for missing facts
    """
    is_complete, missing = self.check_completeness(query, facts)
    return [mf.question for mf in missing]

MissingFact

mistaber.engine.completeness.MissingFact dataclass

Represents a missing OWA predicate value for an entity.

When completeness checking identifies that an OWA predicate does not have a known value for an entity, a MissingFact is created to represent this gap.

Attributes:

Name Type Description
predicate str

The predicate name (e.g., "is_kosher")

entity str

The entity argument (e.g., "chicken")

question str

Human-readable question to resolve the missing fact

is_owa bool

True if this is an Open World Assumption predicate

Source code in mistaber/engine/completeness.py
@dataclass
class MissingFact:
    """Represents a missing OWA predicate value for an entity.

    When completeness checking identifies that an OWA predicate does not have
    a known value for an entity, a MissingFact is created to represent this gap.

    Attributes:
        predicate: The predicate name (e.g., "is_kosher")
        entity: The entity argument (e.g., "chicken")
        question: Human-readable question to resolve the missing fact
        is_owa: True if this is an Open World Assumption predicate
    """

    predicate: str
    entity: str
    question: str
    is_owa: bool = True

    def __str__(self) -> str:
        """Return a string representation of the missing fact."""
        return f"Missing {self.predicate}({self.entity}): {self.question}"

__str__() -> str

Return a string representation of the missing fact.

Source code in mistaber/engine/completeness.py
def __str__(self) -> str:
    """Return a string representation of the missing fact."""
    return f"Missing {self.predicate}({self.entity}): {self.question}"

Explanation

Derivation

mistaber.engine.explain.derivation.Derivation dataclass

Represents a derivation tree node.

Each node contains: - The conclusion (derived atom) - The rule that derived it - The world it holds in - Its madrega (obligation level) - Source citations (makor) - Premise derivations (recursive) - Inheritance info (if inherited from parent world)

Source code in mistaber/engine/explain/derivation.py
@dataclass
class Derivation:
    """
    Represents a derivation tree node.

    Each node contains:
    - The conclusion (derived atom)
    - The rule that derived it
    - The world it holds in
    - Its madrega (obligation level)
    - Source citations (makor)
    - Premise derivations (recursive)
    - Inheritance info (if inherited from parent world)
    """

    conclusion: str
    rule_id: str
    world: str
    madrega: Optional[str] = None
    makor: List[str] = field(default_factory=list)
    certainty: str = "vadai"
    premises: List["Derivation"] = field(default_factory=list)
    inherited_from: Optional[str] = None
    interpretation: Optional[str] = None