From 01ad15dc0928faab0144ff201ba46bc460cb3960 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Thu, 3 Oct 2024 23:39:08 -0400 Subject: [PATCH] PyO3: migrate to `Bound` in parts of `externs/interface.rs` (#21489) Migrate to the `pyo3::Bound` smart pointer in a portion of `src/rust/engine/src/externs/interface.rs`. --- src/rust/engine/src/externs/interface.rs | 177 ++++++++++++----------- 1 file changed, 92 insertions(+), 85 deletions(-) diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 28b3cf36fdb..43a08cadea7 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -416,17 +416,17 @@ struct PySession(Session); impl PySession { #[new] fn __new__( - scheduler: &PyScheduler, + scheduler: &Bound<'_, PyScheduler>, dynamic_ui: bool, ui_use_prodash: bool, max_workunit_level: u64, build_id: String, session_values: PyObject, - cancellation_latch: &PySessionCancellationLatch, + cancellation_latch: &Bound<'_, PySessionCancellationLatch>, py: Python, ) -> PyO3Result { - let core = scheduler.0.core.clone(); - let cancellation_latch = cancellation_latch.0.clone(); + let core = scheduler.borrow().0.core.clone(); + let cancellation_latch = cancellation_latch.borrow().0.clone(); let py_level: PythonLogLevel = max_workunit_level .try_into() .map_err(|e| PyException::new_err(format!("{e}")))?; @@ -575,7 +575,7 @@ impl PyThreadLocals { #[pymethods] impl PyThreadLocals { #[classmethod] - fn get_for_current_thread(_cls: &PyType) -> Self { + fn get_for_current_thread(_cls: &Bound<'_, PyType>) -> Self { Self::get() } @@ -635,10 +635,10 @@ fn nailgun_server_create( #[pyfunction] fn nailgun_server_await_shutdown( py: Python, - nailgun_server_ptr: &PyNailgunServer, + nailgun_server_ptr: &Bound<'_, PyNailgunServer>, ) -> PyO3Result<()> { - if let Some(server) = nailgun_server_ptr.server.borrow_mut().take() { - let executor = nailgun_server_ptr.executor.clone(); + if let Some(server) = nailgun_server_ptr.borrow().server.borrow_mut().take() { + let executor = nailgun_server_ptr.borrow().executor.clone(); py.allow_threads(|| executor.block_on(server.shutdown())) .map_err(PyException::new_err) } else { @@ -698,18 +698,18 @@ fn hash_prefix_zero_bits(item: &str) -> u32 { /// #[pyfunction] fn scheduler_create( - py_executor: &externs::scheduler::PyExecutor, - py_tasks: &PyTasks, - types_ptr: &PyTypes, + py_executor: &Bound<'_, externs::scheduler::PyExecutor>, + py_tasks: &Bound<'_, PyTasks>, + types_ptr: &Bound<'_, PyTypes>, build_root: PathBuf, local_execution_root_dir: PathBuf, named_caches_dir: PathBuf, ignore_patterns: Vec, use_gitignore: bool, watch_filesystem: bool, - remoting_options: &PyRemotingOptions, - local_store_options: &PyLocalStoreOptions, - exec_strategy_opts: &PyExecutionStrategyOptions, + remoting_options: &Bound<'_, PyRemotingOptions>, + local_store_options: &Bound<'_, PyLocalStoreOptions>, + exec_strategy_opts: &Bound<'_, PyExecutionStrategyOptions>, ca_certs_path: Option, ) -> PyO3Result { match fs::increase_limits() { @@ -717,15 +717,17 @@ fn scheduler_create( Err(e) => warn!("{}", e), } let types = types_ptr + .borrow() .0 .borrow_mut() .take() .ok_or_else(|| PyException::new_err("An instance of PyTypes may only be used once."))?; - let tasks = py_tasks.0.replace(Tasks::new()); + let tasks = py_tasks.borrow().0.replace(Tasks::new()); // NOTE: Enter the Tokio runtime so that libraries like Tonic (for gRPC) are able to // use `tokio::spawn` since Python does not setup Tokio for the main thread. This also // ensures that the correct executor is used by those libraries. + let py_executor = py_executor.borrow(); let core = py_executor .0 .enter(|| { @@ -741,9 +743,9 @@ fn scheduler_create( local_execution_root_dir, named_caches_dir, ca_certs_path, - local_store_options.0.clone(), - remoting_options.0.clone(), - exec_strategy_opts.0.clone(), + local_store_options.borrow().0.clone(), + remoting_options.borrow().0.clone(), + exec_strategy_opts.borrow().0.clone(), ) .await }) @@ -1026,18 +1028,17 @@ fn session_poll_workunits( #[pyfunction] fn session_run_interactive_process( py: Python, - py_session: &PySession, + py_session: &Bound<'_, PySession>, interactive_process: PyObject, process_config_from_environment: PyProcessExecutionEnvironment, ) -> PyO3Result { - let core = py_session.0.core(); - let context = py_session - .0 - .core() - .graph - .context(SessionCore::new(py_session.0.clone())); + let core = py_session.borrow().0.core().clone(); + let session = &py_session.borrow().0; + let context = core.graph.context(SessionCore::new(session.clone())); + let interactive_process: Value = interactive_process.into(); let process_config = Value::new(process_config_from_environment.into_py(py)); + py.allow_threads(|| { core.executor.clone().block_on(nodes::task_context( context.clone(), @@ -1053,34 +1054,41 @@ fn session_run_interactive_process( #[pyfunction] fn scheduler_metrics<'py>( py: Python<'py>, - py_scheduler: &'py PyScheduler, - py_session: &'py PySession, -) -> HashMap<&'py str, i64> { - py_scheduler - .0 - .core - .executor - .enter(|| py.allow_threads(|| py_scheduler.0.metrics(&py_session.0))) + py_scheduler: &Bound<'py, PyScheduler>, + py_session: &Bound<'py, PySession>, +) -> HashMap { + let core = py_session.borrow().0.core().clone(); + let scheduler = &py_scheduler.borrow().0; + let session = &py_session.borrow().0; + + core.executor.enter(|| { + py.allow_threads(|| { + let result = scheduler.metrics(session); + result.into_iter().map(|(k, v)| (k.to_owned(), v)).collect() + }) + }) } #[pyfunction] fn scheduler_live_items<'py>( py: Python<'py>, - py_scheduler: &'py PyScheduler, - py_session: &'py PySession, + py_scheduler: &Bound<'py, PyScheduler>, + py_session: &Bound<'_, PySession>, ) -> (Vec, HashMap<&'static str, (usize, usize)>) { - let (items, sizes) = py_scheduler - .0 - .core + let core = py_scheduler.borrow().0.core.clone(); + let scheduler = &py_scheduler.borrow().0; + let session = &py_session.borrow().0; + + let (items, sizes) = core .executor - .enter(|| py.allow_threads(|| py_scheduler.0.live_items(&py_session.0))); + .enter(|| py.allow_threads(|| scheduler.live_items(session))); let py_items = items.into_iter().map(|value| value.to_object(py)).collect(); (py_items, sizes) } #[pyfunction] -fn scheduler_shutdown(py: Python, py_scheduler: &PyScheduler, timeout_secs: u64) { - let core = &py_scheduler.0.core; +fn scheduler_shutdown(py: Python, py_scheduler: &Bound<'_, PyScheduler>, timeout_secs: u64) { + let core = py_scheduler.borrow().0.core.clone(); core.executor.enter(|| { py.allow_threads(|| { core.executor @@ -1144,11 +1152,11 @@ fn execution_add_root_select( #[pyfunction] fn tasks_task_begin( - py_tasks: &PyTasks, + py_tasks: &Bound<'_, PyTasks>, func: PyObject, output_type: &Bound<'_, PyType>, - arg_types: Vec<(String, &PyType)>, - masked_types: Vec<&PyType>, + arg_types: Vec<(String, Bound<'_, PyType>)>, + masked_types: Vec>, side_effecting: bool, engine_aware_return_type: bool, cacheable: bool, @@ -1165,12 +1173,8 @@ fn tasks_task_begin( .into_iter() .map(|(name, typ)| (name, TypeId::new(&typ.as_borrowed()))) .collect(); - let masked_types = masked_types - .into_iter() - .map(|t| TypeId::new(&t.as_borrowed())) - .collect(); - let mut tasks = py_tasks.0.borrow_mut(); - tasks.task_begin( + let masked_types = masked_types.into_iter().map(|t| TypeId::new(&t)).collect(); + py_tasks.borrow_mut().0.borrow_mut().task_begin( func, output_type, side_effecting, @@ -1186,68 +1190,71 @@ fn tasks_task_begin( } #[pyfunction] -fn tasks_task_end(py_tasks: &PyTasks) { - let mut tasks = py_tasks.0.borrow_mut(); - tasks.task_end(); +fn tasks_task_end(py_tasks: &Bound<'_, PyTasks>) { + py_tasks.borrow_mut().0.borrow_mut().task_end(); } #[pyfunction] fn tasks_add_call( - py_tasks: &PyTasks, + py_tasks: &Bound<'_, PyTasks>, output: &Bound<'_, PyType>, - inputs: Vec<&PyType>, + inputs: Vec>, rule_id: String, explicit_args_arity: u16, ) { let output = TypeId::new(output); - let inputs = inputs - .into_iter() - .map(|t| TypeId::new(&t.as_borrowed())) - .collect(); - let mut tasks = py_tasks.0.borrow_mut(); - tasks.add_call(output, inputs, rule_id, explicit_args_arity); + let inputs = inputs.into_iter().map(|t| TypeId::new(&t)).collect(); + py_tasks + .borrow_mut() + .0 + .borrow_mut() + .add_call(output, inputs, rule_id, explicit_args_arity); } #[pyfunction] -fn tasks_add_get(py_tasks: &PyTasks, output: &Bound<'_, PyType>, inputs: Vec<&PyType>) { +fn tasks_add_get( + py_tasks: &Bound<'_, PyTasks>, + output: &Bound<'_, PyType>, + inputs: Vec>, +) { let output = TypeId::new(output); - let inputs = inputs - .into_iter() - .map(|t| TypeId::new(&t.as_borrowed())) - .collect(); - let mut tasks = py_tasks.0.borrow_mut(); - tasks.add_get(output, inputs); + let inputs = inputs.into_iter().map(|t| TypeId::new(&t)).collect(); + py_tasks.borrow_mut().0.borrow_mut().add_get(output, inputs); } #[pyfunction] fn tasks_add_get_union( - py_tasks: &PyTasks, + py_tasks: &Bound<'_, PyTasks>, output_type: &Bound<'_, PyType>, - input_types: Vec<&PyType>, - in_scope_types: Vec<&PyType>, + input_types: Vec>, + in_scope_types: Vec>, ) { let product = TypeId::new(output_type); - let input_types = input_types - .into_iter() - .map(|t| TypeId::new(&t.as_borrowed())) - .collect(); + let input_types = input_types.into_iter().map(|t| TypeId::new(&t)).collect(); let in_scope_types = in_scope_types .into_iter() - .map(|t| TypeId::new(&t.as_borrowed())) + .map(|t| TypeId::new(&t)) .collect(); - let mut tasks = py_tasks.0.borrow_mut(); - tasks.add_get_union(product, input_types, in_scope_types); + py_tasks + .borrow_mut() + .0 + .borrow_mut() + .add_get_union(product, input_types, in_scope_types); } #[pyfunction] -fn tasks_add_query(py_tasks: &PyTasks, output_type: &Bound<'_, PyType>, input_types: Vec<&PyType>) { +fn tasks_add_query( + py_tasks: &Bound<'_, PyTasks>, + output_type: &Bound<'_, PyType>, + input_types: Vec>, +) { let product = TypeId::new(output_type); - let params = input_types - .into_iter() - .map(|t| TypeId::new(&t.as_borrowed())) - .collect(); - let mut tasks = py_tasks.0.borrow_mut(); - tasks.query_add(product, params); + let params = input_types.into_iter().map(|t| TypeId::new(&t)).collect(); + py_tasks + .borrow_mut() + .0 + .borrow_mut() + .query_add(product, params); } #[pyfunction]