Skip to content

Commit

Permalink
[dagster-pipes] start PipesLogReader right away (#25026)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Looks like there is no reason to have `PipesLogReader` startup be
dependent on `PipesMessageReader` startup.

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
danielgafni authored Oct 10, 2024
1 parent 348a979 commit 6dbd81a
Showing 1 changed file with 5 additions and 16 deletions.
21 changes: 5 additions & 16 deletions python_modules/dagster/dagster/_core/pipes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,20 +439,6 @@ def _logs_thread(
is_session_closed: Event,
messages_thread: Thread,
) -> None:
# Start once we have received the opened message from the external process
while True:
if self.opened_payload is not None:
break
# We never received the `opened` message and never will, so don't try to start the log
# reader threads.
elif not messages_thread.is_alive():
return
time.sleep(DEFAULT_SLEEP_INTERVAL)

# Logs are started with a merge of the params generated by the message reader and the opened
# payload.
log_params = {**params, **self.opened_payload}

wait_for_logs_start = None

# Loop over all log readers and start them if the target is readable, which typically means
Expand All @@ -463,16 +449,19 @@ def _logs_thread(
unstarted_log_readers = {**self.log_readers}

while True:
if self.opened_payload is not None:
params = {**params, **self.opened_payload}

# periodically check for new readers which may be added after the
# external process has started and add them to the unstarted log readers
for key in self.log_readers:
if key not in unstarted_log_readers:
unstarted_log_readers[key] = self.log_readers[key]

for key in list(unstarted_log_readers.keys()).copy():
if unstarted_log_readers[key].target_is_readable(log_params):
if unstarted_log_readers[key].target_is_readable(params):
reader = unstarted_log_readers.pop(key)
reader.start(log_params, is_session_closed)
reader.start(params, is_session_closed)

# In some cases logs might not be written out until after the external process has
# exited. That will leave us in this state, where some log readers have not been
Expand Down

0 comments on commit 6dbd81a

Please sign in to comment.