Skip to content

Commit

Permalink
PyO3: migrate to Bound in parts of externs/interface.rs (#21489)
Browse files Browse the repository at this point in the history
Migrate to the `pyo3::Bound` smart pointer in a portion of `src/rust/engine/src/externs/interface.rs`.
  • Loading branch information
tdyas authored Oct 4, 2024
1 parent a9f2bf4 commit 01ad15d
Showing 1 changed file with 92 additions and 85 deletions.
177 changes: 92 additions & 85 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,17 +416,17 @@ struct PySession(Session);
impl PySession {
#[new]
fn __new__(
scheduler: &PyScheduler,
scheduler: &Bound<'_, PyScheduler>,
dynamic_ui: bool,
ui_use_prodash: bool,
max_workunit_level: u64,
build_id: String,
session_values: PyObject,
cancellation_latch: &PySessionCancellationLatch,
cancellation_latch: &Bound<'_, PySessionCancellationLatch>,
py: Python,
) -> PyO3Result<Self> {
let core = scheduler.0.core.clone();
let cancellation_latch = cancellation_latch.0.clone();
let core = scheduler.borrow().0.core.clone();
let cancellation_latch = cancellation_latch.borrow().0.clone();
let py_level: PythonLogLevel = max_workunit_level
.try_into()
.map_err(|e| PyException::new_err(format!("{e}")))?;
Expand Down Expand Up @@ -575,7 +575,7 @@ impl PyThreadLocals {
#[pymethods]
impl PyThreadLocals {
#[classmethod]
fn get_for_current_thread(_cls: &PyType) -> Self {
fn get_for_current_thread(_cls: &Bound<'_, PyType>) -> Self {
Self::get()
}

Expand Down Expand Up @@ -635,10 +635,10 @@ fn nailgun_server_create(
#[pyfunction]
fn nailgun_server_await_shutdown(
py: Python,
nailgun_server_ptr: &PyNailgunServer,
nailgun_server_ptr: &Bound<'_, PyNailgunServer>,
) -> PyO3Result<()> {
if let Some(server) = nailgun_server_ptr.server.borrow_mut().take() {
let executor = nailgun_server_ptr.executor.clone();
if let Some(server) = nailgun_server_ptr.borrow().server.borrow_mut().take() {
let executor = nailgun_server_ptr.borrow().executor.clone();
py.allow_threads(|| executor.block_on(server.shutdown()))
.map_err(PyException::new_err)
} else {
Expand Down Expand Up @@ -698,34 +698,36 @@ fn hash_prefix_zero_bits(item: &str) -> u32 {
///
#[pyfunction]
fn scheduler_create(
py_executor: &externs::scheduler::PyExecutor,
py_tasks: &PyTasks,
types_ptr: &PyTypes,
py_executor: &Bound<'_, externs::scheduler::PyExecutor>,
py_tasks: &Bound<'_, PyTasks>,
types_ptr: &Bound<'_, PyTypes>,
build_root: PathBuf,
local_execution_root_dir: PathBuf,
named_caches_dir: PathBuf,
ignore_patterns: Vec<String>,
use_gitignore: bool,
watch_filesystem: bool,
remoting_options: &PyRemotingOptions,
local_store_options: &PyLocalStoreOptions,
exec_strategy_opts: &PyExecutionStrategyOptions,
remoting_options: &Bound<'_, PyRemotingOptions>,
local_store_options: &Bound<'_, PyLocalStoreOptions>,
exec_strategy_opts: &Bound<'_, PyExecutionStrategyOptions>,
ca_certs_path: Option<PathBuf>,
) -> PyO3Result<PyScheduler> {
match fs::increase_limits() {
Ok(msg) => debug!("{}", msg),
Err(e) => warn!("{}", e),
}
let types = types_ptr
.borrow()
.0
.borrow_mut()
.take()
.ok_or_else(|| PyException::new_err("An instance of PyTypes may only be used once."))?;
let tasks = py_tasks.0.replace(Tasks::new());
let tasks = py_tasks.borrow().0.replace(Tasks::new());

// NOTE: Enter the Tokio runtime so that libraries like Tonic (for gRPC) are able to
// use `tokio::spawn` since Python does not setup Tokio for the main thread. This also
// ensures that the correct executor is used by those libraries.
let py_executor = py_executor.borrow();
let core = py_executor
.0
.enter(|| {
Expand All @@ -741,9 +743,9 @@ fn scheduler_create(
local_execution_root_dir,
named_caches_dir,
ca_certs_path,
local_store_options.0.clone(),
remoting_options.0.clone(),
exec_strategy_opts.0.clone(),
local_store_options.borrow().0.clone(),
remoting_options.borrow().0.clone(),
exec_strategy_opts.borrow().0.clone(),
)
.await
})
Expand Down Expand Up @@ -1026,18 +1028,17 @@ fn session_poll_workunits(
#[pyfunction]
fn session_run_interactive_process(
py: Python,
py_session: &PySession,
py_session: &Bound<'_, PySession>,
interactive_process: PyObject,
process_config_from_environment: PyProcessExecutionEnvironment,
) -> PyO3Result<PyObject> {
let core = py_session.0.core();
let context = py_session
.0
.core()
.graph
.context(SessionCore::new(py_session.0.clone()));
let core = py_session.borrow().0.core().clone();
let session = &py_session.borrow().0;
let context = core.graph.context(SessionCore::new(session.clone()));

let interactive_process: Value = interactive_process.into();
let process_config = Value::new(process_config_from_environment.into_py(py));

py.allow_threads(|| {
core.executor.clone().block_on(nodes::task_context(
context.clone(),
Expand All @@ -1053,34 +1054,41 @@ fn session_run_interactive_process(
#[pyfunction]
fn scheduler_metrics<'py>(
py: Python<'py>,
py_scheduler: &'py PyScheduler,
py_session: &'py PySession,
) -> HashMap<&'py str, i64> {
py_scheduler
.0
.core
.executor
.enter(|| py.allow_threads(|| py_scheduler.0.metrics(&py_session.0)))
py_scheduler: &Bound<'py, PyScheduler>,
py_session: &Bound<'py, PySession>,
) -> HashMap<String, i64> {
let core = py_session.borrow().0.core().clone();
let scheduler = &py_scheduler.borrow().0;
let session = &py_session.borrow().0;

core.executor.enter(|| {
py.allow_threads(|| {
let result = scheduler.metrics(session);
result.into_iter().map(|(k, v)| (k.to_owned(), v)).collect()
})
})
}

#[pyfunction]
fn scheduler_live_items<'py>(
py: Python<'py>,
py_scheduler: &'py PyScheduler,
py_session: &'py PySession,
py_scheduler: &Bound<'py, PyScheduler>,
py_session: &Bound<'_, PySession>,
) -> (Vec<PyObject>, HashMap<&'static str, (usize, usize)>) {
let (items, sizes) = py_scheduler
.0
.core
let core = py_scheduler.borrow().0.core.clone();
let scheduler = &py_scheduler.borrow().0;
let session = &py_session.borrow().0;

let (items, sizes) = core
.executor
.enter(|| py.allow_threads(|| py_scheduler.0.live_items(&py_session.0)));
.enter(|| py.allow_threads(|| scheduler.live_items(session)));
let py_items = items.into_iter().map(|value| value.to_object(py)).collect();
(py_items, sizes)
}

#[pyfunction]
fn scheduler_shutdown(py: Python, py_scheduler: &PyScheduler, timeout_secs: u64) {
let core = &py_scheduler.0.core;
fn scheduler_shutdown(py: Python, py_scheduler: &Bound<'_, PyScheduler>, timeout_secs: u64) {
let core = py_scheduler.borrow().0.core.clone();
core.executor.enter(|| {
py.allow_threads(|| {
core.executor
Expand Down Expand Up @@ -1144,11 +1152,11 @@ fn execution_add_root_select(

#[pyfunction]
fn tasks_task_begin(
py_tasks: &PyTasks,
py_tasks: &Bound<'_, PyTasks>,
func: PyObject,
output_type: &Bound<'_, PyType>,
arg_types: Vec<(String, &PyType)>,
masked_types: Vec<&PyType>,
arg_types: Vec<(String, Bound<'_, PyType>)>,
masked_types: Vec<Bound<'_, PyType>>,
side_effecting: bool,
engine_aware_return_type: bool,
cacheable: bool,
Expand All @@ -1165,12 +1173,8 @@ fn tasks_task_begin(
.into_iter()
.map(|(name, typ)| (name, TypeId::new(&typ.as_borrowed())))
.collect();
let masked_types = masked_types
.into_iter()
.map(|t| TypeId::new(&t.as_borrowed()))
.collect();
let mut tasks = py_tasks.0.borrow_mut();
tasks.task_begin(
let masked_types = masked_types.into_iter().map(|t| TypeId::new(&t)).collect();
py_tasks.borrow_mut().0.borrow_mut().task_begin(
func,
output_type,
side_effecting,
Expand All @@ -1186,68 +1190,71 @@ fn tasks_task_begin(
}

#[pyfunction]
fn tasks_task_end(py_tasks: &PyTasks) {
let mut tasks = py_tasks.0.borrow_mut();
tasks.task_end();
fn tasks_task_end(py_tasks: &Bound<'_, PyTasks>) {
py_tasks.borrow_mut().0.borrow_mut().task_end();
}

#[pyfunction]
fn tasks_add_call(
py_tasks: &PyTasks,
py_tasks: &Bound<'_, PyTasks>,
output: &Bound<'_, PyType>,
inputs: Vec<&PyType>,
inputs: Vec<Bound<'_, PyType>>,
rule_id: String,
explicit_args_arity: u16,
) {
let output = TypeId::new(output);
let inputs = inputs
.into_iter()
.map(|t| TypeId::new(&t.as_borrowed()))
.collect();
let mut tasks = py_tasks.0.borrow_mut();
tasks.add_call(output, inputs, rule_id, explicit_args_arity);
let inputs = inputs.into_iter().map(|t| TypeId::new(&t)).collect();
py_tasks
.borrow_mut()
.0
.borrow_mut()
.add_call(output, inputs, rule_id, explicit_args_arity);
}

#[pyfunction]
fn tasks_add_get(py_tasks: &PyTasks, output: &Bound<'_, PyType>, inputs: Vec<&PyType>) {
fn tasks_add_get(
py_tasks: &Bound<'_, PyTasks>,
output: &Bound<'_, PyType>,
inputs: Vec<Bound<'_, PyType>>,
) {
let output = TypeId::new(output);
let inputs = inputs
.into_iter()
.map(|t| TypeId::new(&t.as_borrowed()))
.collect();
let mut tasks = py_tasks.0.borrow_mut();
tasks.add_get(output, inputs);
let inputs = inputs.into_iter().map(|t| TypeId::new(&t)).collect();
py_tasks.borrow_mut().0.borrow_mut().add_get(output, inputs);
}

#[pyfunction]
fn tasks_add_get_union(
py_tasks: &PyTasks,
py_tasks: &Bound<'_, PyTasks>,
output_type: &Bound<'_, PyType>,
input_types: Vec<&PyType>,
in_scope_types: Vec<&PyType>,
input_types: Vec<Bound<'_, PyType>>,
in_scope_types: Vec<Bound<'_, PyType>>,
) {
let product = TypeId::new(output_type);
let input_types = input_types
.into_iter()
.map(|t| TypeId::new(&t.as_borrowed()))
.collect();
let input_types = input_types.into_iter().map(|t| TypeId::new(&t)).collect();
let in_scope_types = in_scope_types
.into_iter()
.map(|t| TypeId::new(&t.as_borrowed()))
.map(|t| TypeId::new(&t))
.collect();
let mut tasks = py_tasks.0.borrow_mut();
tasks.add_get_union(product, input_types, in_scope_types);
py_tasks
.borrow_mut()
.0
.borrow_mut()
.add_get_union(product, input_types, in_scope_types);
}

#[pyfunction]
fn tasks_add_query(py_tasks: &PyTasks, output_type: &Bound<'_, PyType>, input_types: Vec<&PyType>) {
fn tasks_add_query(
py_tasks: &Bound<'_, PyTasks>,
output_type: &Bound<'_, PyType>,
input_types: Vec<Bound<'_, PyType>>,
) {
let product = TypeId::new(output_type);
let params = input_types
.into_iter()
.map(|t| TypeId::new(&t.as_borrowed()))
.collect();
let mut tasks = py_tasks.0.borrow_mut();
tasks.query_add(product, params);
let params = input_types.into_iter().map(|t| TypeId::new(&t)).collect();
py_tasks
.borrow_mut()
.0
.borrow_mut()
.query_add(product, params);
}

#[pyfunction]
Expand Down

0 comments on commit 01ad15d

Please sign in to comment.