"""Manages the lifecycle of Lean proofs, from submission to execution and result retrieval."""importasyncioimportloggingfrom..database.proofimportProofDatabasefrom..proof.leanimportLeanProoffrom..proof.protoimportLeanProofConfig,LeanProofResult,LeanProofStatuslogger=logging.getLogger(__name__)
[docs]classProofManager:""" Handles the submission, execution, and tracking of Lean proofs. This class coordinates with the database to store proof data and results. It uses a semaphore to limit concurrent proof executions and manages background tasks for running proofs asynchronously. Attributes: proof_database (ProofDatabase): An instance for interacting with the proof database. lean_semaphore (asyncio.Semaphore): A semaphore to limit concurrent Lean processes. background_tasks (set[asyncio.Task]): A set of currently running background proof tasks. """def__init__(self,*,proof_database:ProofDatabase,lean_semaphore:asyncio.Semaphore,background_tasks:set[asyncio.Task],):""" Initializes the ProofManager. Args: proof_database: The database handler for proofs. lean_semaphore: The semaphore to control concurrency. background_tasks: A set to manage background asyncio tasks. """self.proof_database=proof_databaseself.lean_semaphore=lean_semaphoreself.background_tasks=background_tasks
[docs]asyncdefsubmit_proof(self,*,proof:LeanProof,config:LeanProofConfig,)->dict[str,str]:""" Submits a proof for execution. If the proof already exists, its ID is returned immediately. Otherwise, it's added to the database and a background task is created to run it. Args: proof: The LeanProof object to be executed. config: The configuration for the proof execution. Returns: A dictionary containing the unique ID of the proof. """tmp=awaitself.proof_database.proof_exists(proof=proof)iftmp:logger.info(f"Proof already submitted: {tmp}, returning existing proof ID")return{"id":tmp}awaitself.proof_database.insert_hash(proof=proof)task=asyncio.create_task(self.run_proof(proof=proof,config=config))logger.info(f"Submitted proof: {proof.proof_id}")self.background_tasks.add(task)task.add_done_callback(self.background_tasks.discard)return{"id":proof.proof_id}
[docs]asyncdefrun_proof(self,*,proof:LeanProof,config:LeanProofConfig)->LeanProofResult:""" Executes a single Lean proof. This method acquires a semaphore lock before running the proof to control concurrency. It updates the proof's status in the database before and after execution. If the proof result already exists in the database, it skips execution and returns the stored result. Args: proof: The LeanProof object to execute. config: The configuration for the proof execution. Returns: The result of the proof execution. """tmp=awaitself.proof_database.proof_exists(proof=proof)iftmpisNone:tmp=proof.proof_idawaitself.proof_database.insert_hash(proof=proof)ifawaitself.proof_database.result_exists(proof_id=tmp):logger.info(f"Proof already exists in database: {tmp}, skipping execution")returnawaitself.get_result(proof_id=tmp)awaitself.proof_database.update_status(proof_id=proof.proof_id,status=LeanProofStatus.PENDING)asyncwithself.lean_semaphore:try:logger.info(f"Running proof: {proof}")logger.info(f"Config: {config}")awaitself.proof_database.update_status(proof_id=proof.proof_id,status=LeanProofStatus.RUNNING)result=awaitproof.execute(config)logger.info(f"Proof result: {result}")awaitself.proof_database.insert_proof(proof=proof,config=config,result=result)awaitself.proof_database.update_status(proof_id=proof.proof_id,status=result.status)logger.info("Proof result inserted into database")returnresultexceptExceptionase:logger.error(f"Error running proof: {e}")awaitself.proof_database.update_status(proof_id=proof.proof_id,status=LeanProofStatus.ERROR)returnLeanProofResult(status=LeanProofStatus.ERROR,error_message=str(e),)
[docs]asyncdefget_result(self,proof_id:str)->LeanProofResult:""" Retrieves the result of a proof from the database. Args: proof_id: The unique identifier of the proof. Returns: The result of the proof. """returnawaitself.proof_database.get_result(proof_id)