"""
This module defines the LeanProof class for executing Lean proofs.
"""
import asyncio
import json
import logging
import resource
from ..config import Config
from ..utils.uuid.uuid import uuid
from .proto import LeanProofConfig, LeanProofResult, LeanProofStatus
logger = logging.getLogger(__name__)
def _set_memory_limit(memory_limit_mb: int) -> None:
"""
Set memory limit for the current process.
Args:
memory_limit_mb: Memory limit in MB
"""
try:
# Convert MB to bytes
memory_limit_bytes = memory_limit_mb * 1024 * 1024
# Set soft and hard limits for memory
resource.setrlimit(resource.RLIMIT_AS, (memory_limit_bytes, memory_limit_bytes))
logger.debug(f"Set memory limit to {memory_limit_mb} MB")
except Exception as e:
logger.warning(f"Failed to set memory limit: {e}")
[docs]
class LeanProof:
"""
Represents a Lean proof that can be executed.
This class encapsulates the Lean code for a proof and provides a method
to execute it as a subprocess. It handles communication with the Lean
process, including timeouts and error handling.
Attributes:
lean_code (str): The Lean code of the proof.
proof_id (str): A unique identifier for the proof.
config (Config): The application configuration.
"""
def __init__(self, *, proof_id: str | None = None, proof: str, config: Config):
"""
Initializes a LeanProof instance.
Args:
proof_id: An optional unique identifier for the proof. If not
provided, a new UUID will be generated.
proof: The Lean code for the proof.
config: The application configuration.
"""
self.lean_code = proof
if proof_id is None:
self.proof_id = uuid()
else:
self.proof_id = proof_id
self.config = config
[docs]
async def execute(self, config: LeanProofConfig) -> LeanProofResult:
"""
Executes the Lean proof as a subprocess.
This method runs the Lean executable, sends the proof code to it, and
waits for the result. It handles timeouts, process errors, and JSON
parsing of the output.
Args:
config: The configuration for this specific proof execution.
Returns:
A LeanProofResult object containing the status and result of the
proof execution.
"""
proc = None
try:
command = {
"cmd": self.lean_code,
"allTactics": config.all_tactics,
"ast": config.ast,
"tactics": config.tactics,
"premises": config.premises,
}
logger.info(f"Executing command: {command}")
# Create subprocess with proper resource management
def preexec_fn():
"""Set memory limit in child process."""
_set_memory_limit(config.memory_limit_mb)
proc = await asyncio.create_subprocess_exec(
self.config.lean.executable,
"exe",
"repl",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self.config.lean.workspace,
preexec_fn=preexec_fn,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=json.dumps(command).encode("utf-8")),
timeout=config.timeout,
)
except TimeoutError:
logger.error(f"Lean process timed out after {config.timeout} seconds")
# Forcefully terminate the process
if proc.returncode is None:
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=5.0)
except TimeoutError:
logger.warning("Force killing Lean process")
proc.kill()
try:
await proc.wait()
except Exception as kill_error:
logger.warning(
f"Failed to wait for killed process: {kill_error}"
)
return LeanProofResult(
status=LeanProofStatus.ERROR,
error_message=f"Process timed out after {config.timeout} seconds",
result={"status": "timeout"},
)
# Check process return code
if proc.returncode != 0:
logger.warning(f"Lean process exited with code {proc.returncode}")
# Check if process was killed due to memory limit
if proc.returncode == -9: # SIGKILL
error_msg = (
f"Process was killed due to memory limit "
f"({config.memory_limit_mb} MB)"
)
result_status = "memory_limit_exceeded"
else:
error_msg = (
stderr.decode("utf-8")
if stderr
else f"Process exited with code {proc.returncode}"
)
result_status = "process_error"
return LeanProofResult(
status=LeanProofStatus.ERROR,
error_message=error_msg,
result={"status": result_status, "return_code": proc.returncode},
)
error_message = stderr.decode("utf-8") if stderr else None
try:
result = json.loads(stdout.decode("utf-8"))
status = LeanProofStatus.FINISHED
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON: {e}")
result = {
"raw": stdout.decode("utf-8"),
"parse_error_message": str(e),
}
status = LeanProofStatus.ERROR
result, success = self._handle_result(
result=result,
hide_warnings=True,
)
return LeanProofResult(
success=success,
status=status,
result=result,
error_message=error_message,
)
except Exception as e:
logger.error(f"Error executing proof: {e}", exc_info=True)
# Ensure process cleanup in case of unexpected errors
if proc and proc.returncode is None:
try:
proc.terminate()
await asyncio.wait_for(proc.wait(), timeout=5.0)
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup process: {cleanup_error}")
try:
proc.kill()
await proc.wait()
except Exception:
pass # Best effort cleanup
return LeanProofResult(
status=LeanProofStatus.ERROR,
error_message=str(e),
result={"status": "failed", "error_type": type(e).__name__},
)
def _has_error(self, result: dict) -> bool:
"""Check if a proof result has any error messages."""
if result and "messages" in result:
for msg in result["messages"]:
if msg.get("severity") == "error":
return True
return False
def _handle_result(
self, result: dict, hide_warnings: bool = True
) -> tuple[dict, bool]:
"""
Process the result dictionary returned from Lean.
This method filters out warning messages if requested and determines
the overall success of the proof based on the presence of errors.
Args:
result: The original result dictionary from Lean.
hide_warnings: If True, removes all messages with
severity 'warning'.
Returns:
A tuple containing the processed result dictionary and a boolean
indicating success.
"""
success = True
final_result_messages = []
if hide_warnings and "messages" in result:
for msg in result["messages"]:
if msg.get("severity") == "error":
success = False
elif msg.get("severity") == "warning":
if hide_warnings:
continue
final_result_messages.append(msg)
result["messages"] = final_result_messages
return result, success