diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..c712d25 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,2 @@ +[run] +omit = tests/* diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e5ac366..506bc99 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -64,4 +64,4 @@ jobs: pip install -r requirements.txt - name: run tests with pytest run: | - pytest + pytest --cov-config=.coveragerc --cov=. tests diff --git a/display.py b/display.py index 966831f..e2c9f46 100644 --- a/display.py +++ b/display.py @@ -20,16 +20,49 @@ from src.tjc import EventType from src.response_actions import response_actions, input_actions, custom_touch_actions from src.lib_col_pic import parse_thumbnail -from src.elegoo_neptune4 import MODEL_N4_REGULAR, MODEL_N4_PRO, MODEL_N4_PLUS, MODEL_N4_MAX, Neptune4DisplayCommunicator -from src.mapping import PAGE_SHUTDOWN_DIALOG, build_format_filename, filename_regex_wrapper, PAGE_MAIN, PAGE_FILES, PAGE_PREPARE_MOVE, PAGE_PREPARE_TEMP, PAGE_PREPARE_EXTRUDER, PAGE_SETTINGS_TEMPERATURE_SET, PAGE_SETTINGS_ABOUT, PAGE_LEVELING, PAGE_LEVELING_SCREW_ADJUST, PAGE_LEVELING_Z_OFFSET_ADJUST, PAGE_CONFIRM_PRINT, PAGE_PRINTING, PAGE_PRINTING_KAMP, PAGE_PRINTING_PAUSE, PAGE_PRINTING_STOP, PAGE_PRINTING_EMERGENCY_STOP, PAGE_PRINTING_COMPLETE, PAGE_PRINTING_FILAMENT, PAGE_PRINTING_SPEED, PAGE_PRINTING_ADJUST, PAGE_PRINTING_DIALOG_SPEED, PAGE_PRINTING_DIALOG_FLOW, PAGE_OVERLAY_LOADING, format_time -from src.colors import BACKGROUND_DIALOG, BACKGROUND_GRAY, BACKGROUND_SUCCESS, BACKGROUND_WARNING, TEXT_SUCCESS, TEXT_ERROR -from src.wifi import get_wlan0_status +from src.communicator import DisplayCommunicator +from src.elegoo_neptune4 import ( + MODEL_N4_REGULAR, + MODEL_N4_PRO, + MODEL_N4_PLUS, + MODEL_N4_MAX, + MODELS_N4, + Neptune4DisplayCommunicator, +) +from src.elegoo_neptune3 import MODELS_N3, Neptune3DisplayCommunicator +from src.elegoo_custom import MODEL_CUSTOM, CustomDisplayCommunicator +from src.mapping import ( + build_format_filename, + filename_regex_wrapper, + PAGE_MAIN, + PAGE_FILES, + PAGE_PREPARE_MOVE, + PAGE_PREPARE_TEMP, + PAGE_PREPARE_EXTRUDER, + PAGE_SETTINGS_TEMPERATURE_SET, + PAGE_LEVELING, + PAGE_LEVELING_SCREW_ADJUST, + PAGE_LEVELING_Z_OFFSET_ADJUST, + PAGE_CONFIRM_PRINT, + PAGE_PRINTING, + PAGE_PRINTING_KAMP, + PAGE_PRINTING_PAUSE, + PAGE_PRINTING_STOP, + PAGE_PRINTING_EMERGENCY_STOP, + PAGE_PRINTING_COMPLETE, + PAGE_PRINTING_FILAMENT, + PAGE_PRINTING_SPEED, + PAGE_PRINTING_ADJUST, + PAGE_OVERLAY_LOADING, + format_time, +) +from src.colors import BACKGROUND_SUCCESS, BACKGROUND_WARNING log_file = os.path.expanduser("~/printer_data/logs/display_connector.log") logger = logging.getLogger(__name__) ch_log = logging.StreamHandler(sys.stdout) ch_log.setLevel(logging.DEBUG) -formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") ch_log.setFormatter(formatter) logger.addHandler(ch_log) file_log = logging.FileHandler(log_file) @@ -58,46 +91,54 @@ PAGE_PREPARE_TEMP, PAGE_PRINTING_ADJUST, PAGE_PRINTING_FILAMENT, - PAGE_PRINTING_SPEED + PAGE_PRINTING_SPEED, ] -TRANSITION_PAGES = [ - PAGE_OVERLAY_LOADING -] +TRANSITION_PAGES = [PAGE_OVERLAY_LOADING] + +SUPPORTED_PRINTERS = [MODEL_N4_REGULAR, MODEL_N4_PRO, MODEL_N4_PLUS, MODEL_N4_MAX] + + +def get_communicator_for_model(model) -> DisplayCommunicator: + if model == MODEL_CUSTOM: + return CustomDisplayCommunicator + elif model in MODELS_N4: + return Neptune4DisplayCommunicator + elif model in MODELS_N3: + return Neptune3DisplayCommunicator -SUPPORTED_PRINTERS = [ - MODEL_N4_REGULAR, - MODEL_N4_PRO, - MODEL_N4_PLUS, - MODEL_N4_MAX -] SOCKET_LIMIT = 20 * 1024 * 1024 + + class DisplayController: last_config_change = 0 + filament_sensor_name = "filament_sensor" + def __init__(self, config): self.config = config - self.display_name_override = None - self.display_name_line_color = None - self.z_display = "mm" self._handle_config() self.connected = False - - self.display = Neptune4DisplayCommunicator(logger, self.get_printer_model(), event_handler=self.display_event_handler) - self.display.mapper.set_z_display(self.z_display) + printer_model = self.get_printer_model() + self.display = get_communicator_for_model(printer_model)( + logger, + printer_model, + event_handler=self.display_event_handler, + port=self.config.safe_get("general", "serial_port"), + ) + self._handle_display_config() self.part_light_state = False self.frame_light_state = False self.fan_state = False self.filament_sensor_state = False - self.is_blocking_serial = False - self.move_distance = '1' + self.move_distance = "1" self.xy_move_speed = 130 self.z_move_speed = 10 - self.z_offset_distance = '0.01' + self.z_offset_distance = "0.01" self.out_fd = sys.stdout.fileno() os.set_blocking(self.out_fd, False) self.pending_req = {} @@ -113,15 +154,11 @@ def __init__(self, config): self.printing_target_temps = { "extruder": 0, "heater_bed": 0, - "heater_bed_outer": 0 + "heater_bed_outer": 0, } self.printing_selected_temp_increment = "10" self.printing_selected_speed_type = "print" - self.printing_target_speeds = { - "print": 1.0, - "flow": 1.0, - "fan": 1.0 - } + self.printing_target_speeds = {"print": 1.0, "flow": 1.0, "fan": 1.0} self.printing_selected_speed_increment = "10" self.extrude_amount = 50 @@ -132,8 +169,6 @@ def __init__(self, config): self.temperature_preset_extruder = 0 self.temperature_preset_bed = 0 - self.ips = "--" - self.leveling_mode = None self.screw_probe_count = 0 self.screw_levels = {} @@ -145,7 +180,6 @@ def __init__(self, config): self.full_bed_leveling_counts = [0, 0] self.bed_leveling_counts = [0, 0] self.bed_leveling_probed_count = 0 - self.bed_leveling_box_size = 0 self.bed_leveling_last_position = None self.klipper_restart_event = asyncio.Event() @@ -164,25 +198,18 @@ def _handle_config(self): logger.info("Loading config") if "general" in self.config: if "clean_filename_regex" in self.config["general"]: - filename_regex_wrapper["default"] = re.compile(self.config["general"]["clean_filename_regex"]) + filename_regex_wrapper["default"] = re.compile( + self.config["general"]["clean_filename_regex"] + ) + if "filament_sensor_name" in self.config["general"]: + self.filament_sensor_name = self.config["general"][ + "filament_sensor_name" + ] if "LOGGING" in self.config: - if "file_log_level" in self.config["LOGGING"]: - file_log.setLevel( self.config["LOGGING"]["file_log_level"]) + if "file_log_level" in self.config["LOGGING"]: + file_log.setLevel(self.config["LOGGING"]["file_log_level"]) logger.setLevel(logging.DEBUG) - - if "main_screen" in self.config: - if "display_name" in self.config["main_screen"]: - self.display_name_override = self.config["main_screen"]["display_name"] - if "display_name_line_color" in self.config["main_screen"]: - self.display_name_line_color = self.config["main_screen"]["display_name_line_color"] - - if "print_screen" in self.config: - if "z_display" in self.config["print_screen"]: - self.z_display = self.config["print_screen"]["z_display"] - if "clean_filename_regex" in self.config["print_screen"]: - filename_regex_wrapper["printing"] = re.compile(self.config["print_screen"]["clean_filename_regex"]) - if "prepare" in self.config: prepare = self.config["prepare"] if "move_distance" in prepare: @@ -194,15 +221,36 @@ def _handle_config(self): self.extrude_amount = prepare.getint("extrude_amount", fallback=50) self.extrude_speed = prepare.getint("extrude_speed", fallback=5) + def _handle_display_config(self): + self.display.mapper.set_filament_sensor_name(self.filament_sensor_name) + if "main_screen" in self.config: + if "display_name" in self.config["main_screen"]: + self.display.display_name_override = self.config["main_screen"][ + "display_name" + ] + if "display_name_line_color" in self.config["main_screen"]: + self.display.display_name_line_color = self.config["main_screen"][ + "display_name_line_color" + ] + if "print_screen" in self.config: + if "z_display" in self.config["print_screen"]: + self.display.mapper.set_z_display( + self.config["print_screen"]["z_display"] + ) + if "clean_filename_regex" in self.config["print_screen"]: + filename_regex_wrapper["printing"] = re.compile( + self.config["print_screen"]["clean_filename_regex"] + ) + def get_printer_model(self): if "general" in self.config: if "printer_model" in self.config["general"]: return self.config["general"]["printer_model"] try: - with open('/boot/.OpenNept4une.txt', 'r') as file: + with open("/boot/.OpenNept4une.txt", "r") as file: for line in file: if line.startswith(tuple(SUPPORTED_PRINTERS)): - model_part = line.split('-')[0].strip() + model_part = line.split("-")[0].strip() return model_part except FileNotFoundError: logger.error("File not found") @@ -210,121 +258,77 @@ def get_printer_model(self): logger.error(f"Error reading file: {e}") return None - def get_device_name(self): - model_map = { - MODEL_N4_REGULAR: "Neptune 4", - MODEL_N4_PRO: "Neptune 4 Pro", - MODEL_N4_PLUS: "Neptune 4 Plus", - MODEL_N4_MAX: "Neptune 4 Max", - } - return model_map[self.display.model] - - def initialize_display(self): - self._write("sendxy=1") - model_image_key = None - if self.display.model == MODEL_N4_REGULAR: - model_image_key = "213" - elif self.display.model == MODEL_N4_PRO: - model_image_key = "214" - self._write(f'p[{self._page_id(PAGE_MAIN)}].disp_q5.val=1') # N4Pro Outer Bed Symbol (Bottom Rig> - elif self.display.model == MODEL_N4_PLUS: - model_image_key = "313" - elif self.display.model == MODEL_N4_MAX: - model_image_key = "314" - - if self.display_name_override is None: - self._write(f'p[{self._page_id(PAGE_MAIN)}].q4.picc={model_image_key}') - else: - self._write(f'p[{self._page_id(PAGE_MAIN)}].q4.picc=137') - - self._write(f'p[{self._page_id(PAGE_SETTINGS_ABOUT)}].b[8].txt="{self.get_device_name()}"') - - async def send_gcodes_async(self, gcodes): - for gcode in gcodes: - logger.debug("Sending GCODE: " + gcode) - await self._send_moonraker_request("printer.gcode.script", {"script": gcode}) - await asyncio.sleep(0.1) - - def send_gcode(self, gcode): - logger.debug("Sending GCODE: " + gcode) - self._loop.create_task(self._send_moonraker_request("printer.gcode.script", {"script": gcode})) - - def move_axis(self, axis, distance): - speed = self.xy_move_speed if axis in ["X", "Y"] else self.z_move_speed - self.send_gcode(f'G91\nG1 {axis}{distance} F{int(speed) * 60}\nG90') - - async def special_page_handling(self): - current_page = self._get_current_page() - if current_page == PAGE_MAIN: - has_wifi = self.update_wifi_ui() - if self.display.model == MODEL_N4_PRO: - self._write('vis out_bedtemp,1') - if self.display_name_override: - display_name = self.display_name_override - if display_name == "MODEL_NAME": - display_name = self.get_device_name() - self._write('xstr 12,20,180,20,1,65535,' + str(BACKGROUND_GRAY) + ',0,1,1,"' + display_name + '"') - if self.display_name_line_color: - self._write('fill 13,47,24,4,' + str(self.display_name_line_color)) - - self._write(f'xpic {200 if has_wifi else 230},16,30,30,220,200,51') - - elif current_page == PAGE_FILES: - self.show_files_page() + async def special_page_handling(self, current_page): + if current_page == PAGE_FILES: + await self.display.show_files_page( + self.current_dir, self.dir_contents, self.files_page + ) elif current_page == PAGE_PREPARE_MOVE: - self.update_prepare_move_ui() + await self.display.update_prepare_move_ui(self.move_distance) elif current_page == PAGE_PREPARE_EXTRUDER: - self.update_prepare_extrude_ui() + await self.display.update_prepare_extrude_ui( + self.extrude_amount, self.extrude_speed + ) elif current_page == PAGE_SETTINGS_TEMPERATURE_SET: - self.update_preset_temp_ui() - elif current_page == PAGE_SETTINGS_ABOUT: - self._write('fill 0,400,320,60,' + str(BACKGROUND_GRAY)) - self._write('xstr 0,400,320,30,1,65535,' + str(BACKGROUND_GRAY) + ',1,1,1,"OpenNept4une"') - self._write('xstr 0,430,320,30,2,GRAY,' + str(BACKGROUND_GRAY) + ',1,1,1,"github.com/OpenNeptune3D"') + await self.display.update_preset_temp_ui( + self.temperature_preset_step, + self.temperature_preset_extruder, + self.temperature_preset_bed, + ) elif current_page == PAGE_CONFIRM_PRINT: self._loop.create_task(self.set_data_prepare_screen(self.current_filename)) - elif current_page == PAGE_PRINTING: - self._write("printvalue.xcen=0") - self._write("move printvalue,13,267,13,267,0,10") - self._write("vis b[16],0") - elif current_page == PAGE_PRINTING_COMPLETE: - self._write('b[4].txt="Print Completed!"') - self._write(f'b[3].txt="{build_format_filename()(self.current_filename)}"') - self._write(f'b[5].txt="Time: {format_time(self.current_print_duration)}"') elif current_page == PAGE_PRINTING_FILAMENT: - self.update_printing_heater_settings_ui() - self.update_printing_temperature_increment_ui() + await self.display.update_printing_heater_settings_ui( + self.printing_selected_heater, + self.printing_target_temps[self.printing_selected_heater], + ) + await self.display.update_printing_temperature_increment_ui( + self.printing_selected_temp_increment + ) elif current_page == PAGE_PRINTING_ADJUST: - self.update_printing_zoffset_increment_ui() - self._write("b[20].txt=\"" + self.ips + "\"") + await self.display.update_printing_zoffset_increment_ui( + self.z_offset_distance + ) elif current_page == PAGE_PRINTING_SPEED: - self.update_printing_speed_settings_ui() - self.update_printing_speed_increment_ui() + await self.display.update_printing_speed_settings_ui( + self.printing_selected_speed_type, + self.printing_target_speeds[self.printing_selected_speed_type], + ) + await self.display.update_printing_speed_increment_ui( + self.printing_selected_speed_increment + ) elif current_page == PAGE_LEVELING: - self._write("b[12].txt=\"Leveling\"") - self._write("b[18].txt=\"Screws Tilt Adjust\"") - self._write("b[19].txt=\"Z-Probe Offset\"") self.leveling_mode = None elif current_page == PAGE_LEVELING_SCREW_ADJUST: - self.draw_initial_screw_leveling() + await self.display.draw_initial_screw_leveling() self._loop.create_task(self.handle_screw_leveling()) elif current_page == PAGE_LEVELING_Z_OFFSET_ADJUST: - self.draw_initial_zprobe_leveling() + await self.display.draw_initial_zprobe_leveling() self._loop.create_task(self.handle_zprobe_leveling()) elif current_page == PAGE_PRINTING_KAMP: - self.draw_kamp_page() - elif current_page == PAGE_PRINTING_DIALOG_SPEED: - self._write("b[3].maxval=200") - elif current_page == PAGE_PRINTING_DIALOG_FLOW: - self._write("b[3].maxval=200") - elif current_page == PAGE_SHUTDOWN_DIALOG: - self._write("fill 20,100,232,240," + str(BACKGROUND_DIALOG)) - self._write("xstr 24,104,224,50,1,65535,10665,1,1,1,\"Shut Down Host\"") - self._write("xstr 24,158,224,50,1,65535,10665,1,1,1,\"Reboot Host\"") - self._write("xstr 24,212,224,50,1,65535,10665,1,1,1,\"Reboot Klipper\"") - self._write("xstr 24,286,224,50,1,65535,10665,1,1,1,\"Back\"") - - def _navigate_to_page(self, page, clear_history = False): + await self.display.draw_kamp_page(self.bed_leveling_counts) + + await self.display.special_page_handling(current_page) + + async def send_gcodes_async(self, gcodes): + for gcode in gcodes: + logger.debug("Sending GCODE: " + gcode) + await self._send_moonraker_request( + "printer.gcode.script", {"script": gcode} + ) + await asyncio.sleep(0.1) + + def send_gcode(self, gcode): + logger.debug("Sending GCODE: " + gcode) + self._loop.create_task( + self._send_moonraker_request("printer.gcode.script", {"script": gcode}) + ) + + def move_axis(self, axis, distance): + speed = self.xy_move_speed if axis in ["X", "Y"] else self.z_move_speed + self.send_gcode(f"G91\nG1 {axis}{distance} F{int(speed) * 60}\nG90") + + def _navigate_to_page(self, page, clear_history=False): if len(self.history) == 0 or self.history[-1] != page: if page in TABBED_PAGES and self.history[-1] in TABBED_PAGES: self.history[-1] = page @@ -332,28 +336,38 @@ def _navigate_to_page(self, page, clear_history = False): if clear_history: self.history = [] self.history.append(page) - self._write(f"page {self.display.mapper.map_page(page)}") + self._loop.create_task( + self.display.navigate_to(self.display.mapper.map_page(page)) + ) logger.debug(f"Navigating to {page}") - self._loop.create_task(self.special_page_handling()) + self._loop.create_task(self.special_page_handling(page)) def execute_action(self, action): if action.startswith("move_"): - parts = action.split('_') + parts = action.split("_") axis = parts[1].upper() direction = parts[2] self.move_axis(axis, direction + self.move_distance) elif action.startswith("set_distance_"): - parts = action.split('_') + parts = action.split("_") self.move_distance = parts[2] - self.update_prepare_move_ui() + self._loop.create_task( + self.display.update_prepare_move_ui(self.move_distance) + ) if action.startswith("zoffset_"): - parts = action.split('_') + parts = action.split("_") direction = parts[1] - self.send_gcode(f"SET_GCODE_OFFSET Z_ADJUST={direction}{self.z_offset_distance} MOVE=1") + self.send_gcode( + f"SET_GCODE_OFFSET Z_ADJUST={direction}{self.z_offset_distance} MOVE=1" + ) elif action.startswith("zoffsetchange_"): - parts = action.split('_') + parts = action.split("_") self.z_offset_distance = parts[1] - self.update_printing_zoffset_increment_ui() + self._loop.create_task( + self.display.update_printing_zoffset_increment_ui( + self.z_offset_distance + ) + ) elif action == "toggle_part_light": self.part_light_state = not self.part_light_state self._set_light("Part_Light", self.part_light_state) @@ -372,14 +386,18 @@ def execute_action(self, action): elif action == "go_back": self._go_back() elif action.startswith("page"): - self._navigate_to_page(action.split(' ')[1]) + self._navigate_to_page(action.split(" ")[1]) elif action == "emergency_stop": logger.info("Executing emergency stop!") - self._loop.create_task(self._send_moonraker_request("printer.emergency_stop")) + self._loop.create_task( + self._send_moonraker_request("printer.emergency_stop") + ) elif action == "pause_print_button": if self.current_state == "paused": logger.info("Resuming print") - self._loop.create_task(self._send_moonraker_request("printer.print.resume")) + self._loop.create_task( + self._send_moonraker_request("printer.print.resume") + ) else: self._go_back() self._navigate_to_page(PAGE_PRINTING_PAUSE) @@ -395,45 +413,95 @@ def execute_action(self, action): elif action == "files_picker": self._navigate_to_page(PAGE_FILES) self._loop.create_task(self._load_files()) + elif action.startswith("temp_heater_"): - parts = action.split('_') + parts = action.split("_") self.printing_selected_heater = "_".join(parts[2:]) - self.update_printing_heater_settings_ui() + self._loop.create_task( + self.display.update_printing_heater_settings_ui( + self.printing_selected_heater, + self.printing_target_temps[self.printing_selected_heater], + ) + ) elif action.startswith("temp_increment_"): - parts = action.split('_') + parts = action.split("_") self.printing_selected_temp_increment = parts[2] - self.update_printing_temperature_increment_ui() + self._loop.create_task( + self.display.update_printing_temperature_increment_ui( + self.printing_selected_temp_increment + ) + ) elif action.startswith("temp_adjust_"): - parts = action.split('_') + parts = action.split("_") direction = parts[2] current_temp = self.printing_target_temps[self.printing_selected_heater] - self.send_gcode("SET_HEATER_TEMPERATURE HEATER=" + self.printing_selected_heater + " TARGET=" + - str(current_temp + (int(direction + self.printing_selected_temp_increment) * (1 if direction == '+' else -1)))) + self.send_gcode( + "SET_HEATER_TEMPERATURE HEATER=" + + self.printing_selected_heater + + " TARGET=" + + str( + current_temp + + ( + int(self.printing_selected_temp_increment) + * (1 if direction == "+" else -1) + ) + ) + ) elif action == "temp_reset": - self.send_gcode("SET_HEATER_TEMPERATURE HEATER=" + self.printing_selected_heater + " TARGET=0") + self.send_gcode( + "SET_HEATER_TEMPERATURE HEATER=" + + self.printing_selected_heater + + " TARGET=0" + ) elif action.startswith("speed_type_"): - parts = action.split('_') + parts = action.split("_") self.printing_selected_speed_type = parts[2] - self.update_printing_speed_settings_ui() + self._loop.create_task( + self.display.update_printing_speed_settings_ui( + self.printing_selected_speed_type, + self.printing_target_speeds[self.printing_selected_speed_type], + ) + ) elif action.startswith("speed_increment_"): - parts = action.split('_') + parts = action.split("_") self.printing_selected_speed_increment = parts[2] - self.update_printing_speed_increment_ui() + self._loop.create_task( + self.display.update_printing_speed_increment_ui( + self.printing_selected_speed_increment + ) + ) elif action.startswith("speed_adjust_"): - parts = action.split('_') + parts = action.split("_") direction = parts[2] - current_speed = self.printing_target_speeds[self.printing_selected_speed_type] - change = (int(self.printing_selected_speed_increment) * (1 if direction == '+' else -1)) - self.send_speed_update(self.printing_selected_speed_type, (current_speed + (change / 100.0)) * 100) + current_speed = self.printing_target_speeds[ + self.printing_selected_speed_type + ] + change = int(self.printing_selected_speed_increment) * ( + 1 if direction == "+" else -1 + ) + self.send_speed_update( + self.printing_selected_speed_type, + (current_speed + (change / 100.0)) * 100, + ) elif action == "speed_reset": self.send_speed_update(self.printing_selected_speed_type, 1.0) elif action.startswith("files_page_"): - parts = action.split('_') + parts = action.split("_") direction = parts[2] - self.files_page = int(max(0, min((len(self.dir_contents)/5), self.files_page + (1 if direction == 'next' else -1)))) - self.show_files_page() + self.files_page = int( + max( + 0, + min( + (len(self.dir_contents) / 5), + self.files_page + (1 if direction == "next" else -1), + ), + ) + ) + self._loop.create_task( + self.display.show_files_page(self.current_dir, self.dir_contents, self.files_page) + ) elif action.startswith("open_file_"): - parts = action.split('_') + parts = action.split("_") index = int(parts[2]) selected = self.dir_contents[(self.files_page * 5) + index] if selected["type"] == "dir": @@ -446,17 +514,23 @@ def execute_action(self, action): elif action == "print_opened_file": self._go_back() self._navigate_to_page(PAGE_OVERLAY_LOADING) - self._loop.create_task(self._send_moonraker_request("printer.print.start", {"filename": self.current_filename})) + self._loop.create_task( + self._send_moonraker_request( + "printer.print.start", {"filename": self.current_filename} + ) + ) elif action == "confirm_complete": logger.info("Clearing SD Card") self.send_gcode("SDCARD_RESET_FILE") elif action.startswith("set_temp"): - parts = action.split('_') + parts = action.split("_") target = parts[-1] heater = "_".join(parts[2:-1]) - self.send_gcode("SET_HEATER_TEMPERATURE HEATER=" + heater + " TARGET=" + target) + self.send_gcode( + "SET_HEATER_TEMPERATURE HEATER=" + heater + " TARGET=" + target + ) elif action.startswith("set_preset_temp"): - parts = action.split('_') + parts = action.split("_") material = parts[3].lower() if "temperatures." + material in self.config: @@ -467,55 +541,85 @@ def execute_action(self, action): heater_bed = TEMP_DEFAULTS[material][1] gcodes = [ f"SET_HEATER_TEMPERATURE HEATER=extruder TARGET={extruder}", - f"SET_HEATER_TEMPERATURE HEATER=heater_bed TARGET={heater_bed}" + f"SET_HEATER_TEMPERATURE HEATER=heater_bed TARGET={heater_bed}", ] if self.display.model == MODEL_N4_PRO: - gcodes.append(f"SET_HEATER_TEMPERATURE HEATER=heater_bed_outer TARGET={heater_bed}") + gcodes.append( + f"SET_HEATER_TEMPERATURE HEATER=heater_bed_outer TARGET={heater_bed}" + ) self._loop.create_task(self.send_gcodes_async(gcodes)) elif action.startswith("set_extrude_amount"): - self.extrude_amount = int(action.split('_')[3]) - self.update_prepare_extrude_ui() + self.extrude_amount = int(action.split("_")[3]) + self._loop.create_task( + self.display.update_prepare_extrude_ui(self.extrude_amount) + ) elif action.startswith("set_extrude_speed"): - self.extrude_speed = int(action.split('_')[3]) - self.update_prepare_extrude_ui() + self.extrude_speed = int(action.split("_")[3]) + self._loop.create_task( + self.display.update_prepare_extrude_ui(self.extrude_speed) + ) elif action.startswith("extrude_"): - parts = action.split('_') + parts = action.split("_") direction = parts[1] - self.send_gcode(f"M83\nG1 E{direction}{self.extrude_amount} F{self.extrude_speed * 60}") + self.send_gcode( + f"M83\nG1 E{direction}{self.extrude_amount} F{self.extrude_speed * 60}" + ) elif action.startswith("start_temp_preset_"): - material = action.split('_')[3] + material = action.split("_")[3] self.temperature_preset_material = material if "temperatures." + material in self.config: - self.temperature_preset_extruder = int(self.config["temperatures." + material]["extruder"]) - self.temperature_preset_bed = int(self.config["temperatures." + material]["heater_bed"]) + self.temperature_preset_extruder = int( + self.config["temperatures." + material]["extruder"] + ) + self.temperature_preset_bed = int( + self.config["temperatures." + material]["heater_bed"] + ) else: self.temperature_preset_extruder = TEMP_DEFAULTS[material][0] self.temperature_preset_bed = TEMP_DEFAULTS[material][1] self._navigate_to_page(PAGE_SETTINGS_TEMPERATURE_SET) elif action.startswith("preset_temp_step_"): - size = int(action.split('_')[3]) + size = int(action.split("_")[3]) self.temperature_preset_step = size elif action.startswith("preset_temp_"): - parts = action.split('_') + parts = action.split("_") heater = parts[2] - change = self.temperature_preset_step if parts[3] == "up" else -self.temperature_preset_step + change = ( + self.temperature_preset_step + if parts[3] == "up" + else -self.temperature_preset_step + ) if heater == "extruder": self.temperature_preset_extruder += change else: self.temperature_preset_bed += change - self.update_preset_temp_ui() + self._loop.create_task( + self.display.update_preset_temp_ui( + self.temperature_preset_step, + self.temperature_preset_extruder, + self.temperature_preset_bed, + ) + ) elif action == "save_temp_preset": logger.info("Saving temp preset") self.save_temp_preset() elif action == "retry_screw_leveling": - self.draw_initial_screw_leveling() + self._loop.create_task(self.display.draw_initial_screw_leveling()) self._loop.create_task(self.handle_screw_leveling()) + elif action == "begin_full_bed_level": + self.leveling_mode = "full_bed" + self._navigate_to_page(PAGE_PRINTING_KAMP) + self.send_gcode("AUTO_FULL_BED_LEVEL") elif action.startswith("zprobe_step_"): - parts = action.split('_') + parts = action.split("_") self.z_probe_step = parts[2] - self.update_zprobe_leveling_ui() + self._loop.create_task( + self.display.update_zprobe_leveling_ui( + self.z_probe_step, self.z_probe_distance + ) + ) elif action.startswith("zprobe_"): - parts = action.split('_') + parts = action.split("_") direction = parts[1] self.send_gcode(f"TESTZ Z={direction}{self.z_probe_step}") elif action == "abort_zprobe": @@ -525,12 +629,15 @@ def execute_action(self, action): self.send_gcode("ACCEPT") self.send_gcode("SAVE_CONFIG") self._go_back() + elif action == "save_config": + self.send_gcode("SAVE_CONFIG") + self._go_back() elif action.startswith("set_speed_"): - parts = action.split('_') + parts = action.split("_") speed = int(parts[2]) self.send_speed_update("print", speed) elif action.startswith("set_flow_"): - parts = action.split('_') + parts = action.split("_") speed = int(parts[2]) self.send_speed_update("flow", speed) elif action == "reboot_host": @@ -540,96 +647,60 @@ def execute_action(self, action): self._loop.create_task(self._send_moonraker_request("machine.reboot")) elif action == "shutdown_host": logger.info("Shutting down Host") - self._go_back() + self.display.show_shutdown_screens() self._loop.create_task(self._send_moonraker_request("machine.shutdown")) elif action == "reboot_klipper": logger.info("Rebooting Klipper") - self._loop.create_task(self._send_moonraker_request("machine.services.restart", {"service": "klipper"})) + self._loop.create_task( + self._send_moonraker_request( + "machine.services.restart", {"service": "klipper"} + ) + ) self._go_back() self._navigate_to_page(PAGE_OVERLAY_LOADING) - def _write(self, data, forced = False): - if self.is_blocking_serial and not forced: - return - self._loop.create_task(self.display.write(data)) - def _set_light(self, light_name, new_state): gcode = f"{light_name}_{'ON' if new_state else 'OFF'}" self.send_gcode(gcode) def _toggle_filament_sensor(self, state): - gcode = f"SET_FILAMENT_SENSOR SENSOR=fila ENABLE={'1' if state else '0'}" + gcode = f"SET_FILAMENT_SENSOR SENSOR={self.filament_sensor_name} ENABLE={'1' if state else '0'}" self.send_gcode(gcode) def save_temp_preset(self): if "temperatures." + self.temperature_preset_material not in self.config: self.config["temperatures." + self.temperature_preset_material] = {} - self.config.set("temperatures." + self.temperature_preset_material, "extruder", str(self.temperature_preset_extruder)) - self.config.set("temperatures." + self.temperature_preset_material, "heater_bed", str(self.temperature_preset_bed)) + self.config.set( + "temperatures." + self.temperature_preset_material, + "extruder", + str(self.temperature_preset_extruder), + ) + self.config.set( + "temperatures." + self.temperature_preset_material, + "heater_bed", + str(self.temperature_preset_bed), + ) self.config.write_changes() self._go_back() - def update_printing_heater_settings_ui(self): - if self.display.model == MODEL_N4_PRO: - self._write(f'p[{self._page_id(PAGE_PRINTING_FILAMENT)}].b0.picc=' + str(90 if self.printing_selected_heater == "extruder" else 89)) - self._write(f'p[{self._page_id(PAGE_PRINTING_FILAMENT)}].b1.picc=' + str(90 if self.printing_selected_heater == "heater_bed" else 89)) - self._write(f'p[{self._page_id(PAGE_PRINTING_FILAMENT)}].b2.picc=' + str(90 if self.printing_selected_heater == "heater_bed_outer" else 89)) - self._write(f'p[{self._page_id(PAGE_PRINTING_FILAMENT)}].targettemp.val=' + str(self.printing_target_temps[self.printing_selected_heater])) - - else: - self._write(f'p[{self._page_id(PAGE_PRINTING_FILAMENT)}].b[13].pic={54 + ["extruder", "heater_bed"].index(self.printing_selected_heater)}') - self._write(f'p[{self._page_id(PAGE_PRINTING_FILAMENT)}].b[35].txt="' + str(self.printing_target_temps[self.printing_selected_heater]) + '"') - - - def update_printing_temperature_increment_ui(self): - self._write(f'p[{self._page_id(PAGE_PRINTING_FILAMENT)}].p1.pic={56 + ["1", "5", "10"].index(self.printing_selected_temp_increment)}') - - def update_printing_speed_settings_ui(self): - self._write(f'p[{self._page_id(PAGE_PRINTING_SPEED)}].b0.picc=' + str(59 if self.printing_selected_speed_type == "print" else 58)) - self._write(f'p[{self._page_id(PAGE_PRINTING_SPEED)}].b1.picc=' + str(59 if self.printing_selected_speed_type == "flow" else 58)) - self._write(f'p[{self._page_id(PAGE_PRINTING_SPEED)}].b2.picc=' + str(59 if self.printing_selected_speed_type == "fan" else 58)) - self._write(f'p[{self._page_id(PAGE_PRINTING_SPEED)}].targetspeed.val={self.printing_target_speeds[self.printing_selected_speed_type]*100:.0f}') - - def update_printing_speed_increment_ui(self): - self._write(f'p[{self._page_id(PAGE_PRINTING_SPEED)}].b[14].picc=' + str(59 if self.printing_selected_speed_increment == "1" else 58)) - self._write(f'p[{self._page_id(PAGE_PRINTING_SPEED)}].b[15].picc=' + str(59 if self.printing_selected_speed_increment == "5" else 58)) - self._write(f'p[{self._page_id(PAGE_PRINTING_SPEED)}].b[16].picc=' + str(59 if self.printing_selected_speed_increment == "10" else 58)) - - def update_printing_zoffset_increment_ui(self): - self._write(f'p[{self._page_id(PAGE_PRINTING_ADJUST)}].b[23].picc=' + str(36 if self.z_offset_distance == "0.01" else 65)) - self._write(f'p[{self._page_id(PAGE_PRINTING_ADJUST)}].b[24].picc=' + str(36 if self.z_offset_distance == "0.1" else 65)) - self._write(f'p[{self._page_id(PAGE_PRINTING_ADJUST)}].b[25].picc=' + str(36 if self.z_offset_distance == "1" else 65)) - - def update_preset_temp_ui(self): - self._write(f'p[{self._page_id(PAGE_SETTINGS_TEMPERATURE_SET)}].b[7].pic={56 + [1, 5, 10].index(self.temperature_preset_step)}') - self._write(f'p[{self._page_id(PAGE_SETTINGS_TEMPERATURE_SET)}].b[18].txt="{self.temperature_preset_extruder}"') - self._write(f'p[{self._page_id(PAGE_SETTINGS_TEMPERATURE_SET)}].b[19].txt="{self.temperature_preset_bed}"') - - def update_prepare_move_ui(self): - self._write(f'p[{self._page_id(PAGE_PREPARE_MOVE)}].p0.pic={10 + ["0.1", "1", "10"].index(self.move_distance)}') - - def update_prepare_extrude_ui(self): - self._write(f'p[{self._page_id(PAGE_PREPARE_EXTRUDER)}].b[8].txt="{self.extrude_amount}"') - self._write(f'p[{self._page_id(PAGE_PREPARE_EXTRUDER)}].b[9].txt="{self.extrude_speed}"') - - def update_wifi_ui(self): - has_wifi, ssid, rssi_category = get_wlan0_status() - if not has_wifi: - self._write(f'p[{self._page_id(PAGE_MAIN)}].b[0].pic=214') - return False - if ssid is None: - self._write(f'p[{self._page_id(PAGE_MAIN)}].b[0].pic=313') - else: - self._write(f'p[{self._page_id(PAGE_MAIN)}].b[0].pic={313 + rssi_category}') - return True - def send_speed_update(self, speed_type, new_speed): - if speed_type == "print": - self.send_gcode(f"M220 S{new_speed:.0f}") - elif speed_type == "flow": - self.send_gcode(f"M221 S{new_speed:.0f}") - elif speed_type == "fan": - self.send_gcode(f"SET_FAN_SPEED FAN=fan SPEED={new_speed}") + if new_speed != 1.0: + if speed_type == "print": + self.send_gcode(f"M220 S{new_speed:.0f}") + elif speed_type == "flow": + self.send_gcode(f"M221 S{new_speed:.0f}") + elif speed_type == "fan": + new_speed = int(new_speed) + value = min(max(((new_speed) / 100) * 255, 0), 255) + self.send_gcode(f"M106 S{value}") + else: + if speed_type == "print": + self.send_gcode("M220 S100") + elif speed_type == "flow": + self.send_gcode("M221 S100") + elif speed_type == "fan": + self.send_gcode("M106 S0") + #edited for more stable print interface def _toggle_fan(self, state): gcode = f"M106 S{'255' if state else '0'}" @@ -644,75 +715,65 @@ def _build_path(self, *components): return path[1:] def sort_dir_contents(self, dir_contents): - key = 'modified' + key = "modified" reverse = True - if 'files' in self.config: - files_config = self.config['files'] - if 'sort_by' in files_config: - key = files_config['sort_by'] - if 'sort_order' in files_config: - reverse = files_config['sort_order'] == 'desc' + if "files" in self.config: + files_config = self.config["files"] + if "sort_by" in files_config: + key = files_config["sort_by"] + if "sort_order" in files_config: + reverse = files_config["sort_order"] == "desc" return sorted(dir_contents, key=lambda k: k[key], reverse=reverse) async def _load_files(self): - data = (await self._send_moonraker_request("server.files.get_directory", {"path": "/".join(["gcodes", self.current_dir])})) + data = await self._send_moonraker_request( + "server.files.get_directory", + {"path": "/".join(["gcodes", self.current_dir])}, + ) dir_info = data["result"] self.dir_contents = [] dirs = [] for item in dir_info["dirs"]: if not item["dirname"].startswith("."): - dirs.append({ - "type": "dir", - "path": self._build_path(self.current_dir, item["dirname"]), - "size": item["size"], - "modified": item["modified"], - "name": item["dirname"] - }) + dirs.append( + { + "type": "dir", + "path": self._build_path(self.current_dir, item["dirname"]), + "size": item["size"], + "modified": item["modified"], + "name": item["dirname"], + } + ) files = [] for item in dir_info["files"]: if item["filename"].endswith(".gcode"): - files.append({ - "type": "file", - "path": self._build_path(self.current_dir, item["filename"]), - "size": item["size"], - "modified": item["modified"], - "name": build_format_filename()(item["filename"]) - }) + files.append( + { + "type": "file", + "path": self._build_path(self.current_dir, item["filename"]), + "size": item["size"], + "modified": item["modified"], + "name": build_format_filename()(item["filename"]), + } + ) sort_folders_first = True if "files" in self.config: - sort_folders_first = self.config["files"].getboolean("sort_folders_first", fallback=True) + sort_folders_first = self.config["files"].getboolean( + "sort_folders_first", fallback=True + ) if sort_folders_first: - self.dir_contents = self.sort_dir_contents(dirs) + self.sort_dir_contents(files) + self.dir_contents = self.sort_dir_contents(dirs) + self.sort_dir_contents( + files + ) else: self.dir_contents = self.sort_dir_contents(dirs + files) - self.show_files_page() + await self.display.show_files_page( + self.current_dir, self.dir_contents, self.files_page + ) def _page_id(self, page): return self.display.mapper.map_page(page) - def show_files_page(self): - page_size = 5 - title = self.current_dir.split("/")[-1] - if title == "": - title = "Files" - file_count = len(self.dir_contents) - if file_count == 0: - self._write(f'p[{self._page_id(PAGE_FILES)}].b[11].txt="{title} (Empty)"') - else: - self._write(f'p[{self._page_id(PAGE_FILES)}].b[11].txt="{title} ({(self.files_page * page_size) + 1}-{min((self.files_page * page_size) + page_size, file_count)}/{file_count})"') - component_index = 0 - for index in range(self.files_page * page_size, min(len(self.dir_contents), (self.files_page + 1) * page_size)): - file = self.dir_contents[index] - self._write(f'p[{self._page_id(PAGE_FILES)}].b[{component_index + 18}].txt="{file["name"]}"') - if file["type"] == "dir": - self._write(f'p[{self._page_id(PAGE_FILES)}].b[{component_index + 13}].pic=194') - else: - self._write(f'p[{self._page_id(PAGE_FILES)}].b[{component_index + 13}].pic=193') - component_index += 1 - for index in range(component_index, page_size): - self._write(f'p[{self._page_id(PAGE_FILES)}].b[{index + 13}].pic=195') - self._write(f'p[{self._page_id(PAGE_FILES)}].b[{index + 18}].txt=""') - def _go_back(self): if len(self.history) > 1: if self._get_current_page() == PAGE_FILES and self.current_dir != "": @@ -724,9 +785,11 @@ def _go_back(self): while len(self.history) > 1 and self.history[-1] in TRANSITION_PAGES: self.history.pop() back_page = self.history[-1] - self._write(f"page {self.display.mapper.map_page(back_page)}") + self._loop.create_task( + self.display.navigate_to(self.display.mapper.map_page(back_page)) + ) logger.debug(f"Navigating back to {back_page}") - self._loop.create_task(self.special_page_handling()) + self._loop.create_task(self.special_page_handling(back_page)) else: logger.debug("Already at the main page.") @@ -737,27 +800,37 @@ async def listen(self): await self.display.connect() await self.display.check_valid_version() await self.connect_moonraker() - ret = await self._send_moonraker_request("printer.objects.subscribe", {"objects": { - "gcode_move": ["extrude_factor", "speed_factor", "homing_origin"], - "motion_report": ["live_position", "live_velocity"], - "fan": ["speed"], - "heater_bed": ["temperature", "target"], - "extruder": ["temperature", "target"], - "heater_generic heater_bed_outer": ["temperature", "target"], - "display_status": ["progress"], - "print_stats": ["state", "print_duration", "filename", "total_duration", "info"], - "output_pin Part_Light": ["value"], - "output_pin Frame_Light": ["value"], - "configfile": ["config"], - "filament_switch_sensor fila": ["enabled"] - }}) + ret = await self._send_moonraker_request( + "printer.objects.subscribe", + { + "objects": { + "gcode_move": ["extrude_factor", "speed_factor", "homing_origin"], + "motion_report": ["live_position", "live_velocity"], + "fan": ["speed"], + "heater_bed": ["temperature", "target"], + "extruder": ["temperature", "target"], + "heater_generic heater_bed_outer": ["temperature", "target"], + "display_status": ["progress"], + "print_stats": [ + "state", + "print_duration", + "filename", + "total_duration", + "info", + ], + "output_pin Part_Light": ["value"], + "output_pin Frame_Light": ["value"], + "configfile": ["config"], + f"filament_switch_sensor {self.filament_sensor_name}": ["enabled"], + } + }, + ) data = ret["result"]["status"] - logger.info("Printer Model: " + str(self.get_device_name())) - self.initialize_display() + logger.info("Printer Model: " + str(self.display.get_device_name())) + await self.display.initialize_display() self.handle_status_update(data) - async def _send_moonraker_request( - self, method, params=None): + async def _send_moonraker_request(self, method, params=None): if params is None: params = {} message = self._make_rpc_msg(method, **params) @@ -788,23 +861,33 @@ async def connect_moonraker(self) -> None: logger.info(f"Connecting to Moonraker at {sockpath}") while True: try: - reader, writer = await asyncio.open_unix_connection(sockpath, limit=SOCKET_LIMIT) + reader, writer = await asyncio.open_unix_connection( + sockpath, limit=SOCKET_LIMIT + ) self.writer = writer self._loop.create_task(self._process_stream(reader)) self.connected = True logger.info("Connected to Moonraker") try: - software_version_response = await self._send_moonraker_request("printer.info") - software_version = software_version_response["result"]["software_version"] - software_version = "-".join(software_version.split("-")[:2]) # clean up version string + software_version_response = await self._send_moonraker_request( + "printer.info" + ) + software_version = software_version_response["result"][ + "software_version" + ] + software_version = "-".join( + software_version.split("-")[:2] + ) # clean up version string # Process the software_version... logger.info(f"Software Version: {software_version}") - self._write("p[" + self._page_id(PAGE_SETTINGS_ABOUT) + "].b[10].txt=\"" + software_version + "\"") + await self.display.update_klipper_version_ui(software_version) break except KeyError: - logger.error("KeyError encountered in software_version_response. Attempting to reconnect.") + logger.error( + "KeyError encountered in software_version_response. Attempting to reconnect." + ) await asyncio.sleep(5) # Wait before reconnecting continue # Retry the connection loop @@ -815,17 +898,23 @@ async def connect_moonraker(self) -> None: await asyncio.sleep(5) # Wait before reconnecting continue - ret = await self._send_moonraker_request("server.connection.identify", { + ret = await self._send_moonraker_request( + "server.connection.identify", + { "client_name": "OpenNept4une Display Connector", "version": "0.0.1", "type": "other", - "url": "https://github.com/halfbearman/opennept4une" - }) - logger.debug(f"Client Identified With Moonraker: {ret['result']['connection_id']}") - - system = (await self._send_moonraker_request("machine.system_info"))["result"]["system_info"] - self.ips = ", ".join(self._find_ips(system["network"])) - self._write("p[" + self._page_id(PAGE_SETTINGS_ABOUT) +"].b[16].txt=\"" + self.ips + "\"") + "url": "https://github.com/halfbearman/opennept4une", + }, + ) + logger.debug( + f"Client Identified With Moonraker: {ret['result']['connection_id']}" + ) + + system = (await self._send_moonraker_request("machine.system_info"))["result"][ + "system_info" + ] + self.display.ips = ", ".join(self._find_ips(system["network"])) def _make_rpc_msg(self, method: str, **kwargs): msg = {"jsonrpc": "2.0", "method": method} @@ -849,15 +938,16 @@ def handle_response(self, page, component): def handle_input(self, page, component, value): if page in input_actions: if component in input_actions[page]: - self.execute_action(input_actions[page][component].replace("$", str(value))) + self.execute_action( + input_actions[page][component].replace("$", str(value)) + ) return - + logger.info(f"Unhandled Input: {page} {component} {value}") + def handle_custom_touch(self, x, y): if self._get_current_page() in custom_touch_actions: - print(x, y) actions = custom_touch_actions[self._get_current_page()] for key in actions: - print(key) min_x, min_y, max_x, max_y = key if min_x < x and x < max_x and min_y < y and y < max_y: self.execute_action(actions[key]) @@ -881,16 +971,14 @@ async def display_event_handler(self, type, data): else: logger.info(f"Unhandled Event: {type} {data}") - async def _process_stream( - self, reader: asyncio.StreamReader - ) -> None: + async def _process_stream(self, reader: asyncio.StreamReader) -> None: errors_remaining: int = 10 while not reader.at_eof(): if self.klipper_restart_event.is_set(): await self._attempt_reconnect() self.klipper_restart_event.clear() try: - data = await reader.readuntil(b'\x03') + data = await reader.readuntil(b"\x03") decoded = data[:-1].decode(encoding="utf-8") item = json.loads(decoded) except (ConnectionError, asyncio.IncompleteReadError): @@ -929,7 +1017,10 @@ def handle_machine_config_change(self, new_data): max_z = int(new_data["config"]["stepper_z"]["position_max"]) if max_x > 0 and max_y > 0 and max_z > 0: - self._write(f'p[{self._page_id(PAGE_SETTINGS_ABOUT)}].b[9].txt="{max_x}x{max_y}x{max_z}"') + logger.info(f"Machine Size: {max_x}x{max_y}x{max_z}") + self._loop.create_task( + self.display.update_machine_size_ui(max_x, max_y, max_z) + ) if "bed_mesh" in new_data["config"]: if "probe_count" in new_data["config"]["bed_mesh"]: parts = new_data["config"]["bed_mesh"]["probe_count"].split(",") @@ -945,22 +1036,19 @@ def _get_current_page(self): if len(self.history) > 0: return self.history[-1] return None - + async def set_data_prepare_screen(self, filename): metadata = await self.load_metadata(filename) - self._write(f'p[{self._page_id(PAGE_CONFIRM_PRINT)}].b[3].font=2') - self._write(f'p[{self._page_id(PAGE_CONFIRM_PRINT)}].b[2].txt="{build_format_filename()(filename)}"') - info = [] - if "layer_height" in metadata: - info.append(f"Layer: {metadata['layer_height']}mm") - if "estimated_time" in metadata: - info.append(f"Time: {format_time(metadata['estimated_time'])}") - self._write(f'p[{self._page_id(PAGE_CONFIRM_PRINT)}].b[3].txt="{" ".join(info)}"') + await self.display.set_data_prepare_screen(filename, metadata) - await self.load_thumbnail_for_page(filename, self._page_id(PAGE_CONFIRM_PRINT), metadata) + await self.load_thumbnail_for_page( + filename, self._page_id(PAGE_CONFIRM_PRINT), metadata + ) async def load_metadata(self, filename): - metadata = await self._send_moonraker_request("server.files.metadata", {"filename": filename}) + metadata = await self._send_moonraker_request( + "server.files.metadata", {"filename": filename} + ) return metadata["result"] async def load_thumbnail_for_page(self, filename, page_number, metadata=None): @@ -976,7 +1064,7 @@ async def load_thumbnail_for_page(self, filename, page_number, metadata=None): best_thumbnail = thumbnail if best_thumbnail is None: if self._get_current_page() == page_number: - self._write("vis cp0,0", True) + await self.display.hide_thumbnail() return path = "/".join(filename.split("/")[:-1]) @@ -984,30 +1072,16 @@ async def load_thumbnail_for_page(self, filename, page_number, metadata=None): path = path + "/" path += best_thumbnail["relative_path"] - img = requests.get("http://localhost/server/files/gcodes/" + pathname2url(path), timeout=5) + img = requests.get( + f"{self.config.safe_get('general', 'moonraker_url', 'http://localhost:7125')}/server/files/gcodes/{pathname2url(path)}", timeout=5 + ) thumbnail = Image.open(io.BytesIO(img.content)) background = "29354a" if "thumbnails" in self.config: if "background_color" in self.config["thumbnails"]: background = self.config["thumbnails"]["background_color"] image = parse_thumbnail(thumbnail, 160, 160, background) - - parts = [] - start = 0 - end = 1024 - while (start + 1024 < len(image)): - parts.append(image[start:end]) - start = start + 1024 - end = end + 1024 - - parts.append(image[start:len(image)]) - self.is_blocking_serial = True - if self._get_current_page() == page_number: - self._write("vis cp0,1", True) - self._write("p[" + str(page_number) + "].cp0.close()", True) - for part in parts: - self._write("p[" + str(page_number) + "].cp0.write(\"" + str(part) + "\")", True) - self.is_blocking_serial = False + await self.display.display_thumbnail(page_number, image) def handle_status_update(self, new_data, data_mapping=None): if data_mapping is None: @@ -1017,67 +1091,140 @@ def handle_status_update(self, new_data, data_mapping=None): filename = new_data["print_stats"]["filename"] self.current_filename = filename if filename is not None and filename != "": - self._loop.create_task(self.load_thumbnail_for_page(self.current_filename, "19")) + self._loop.create_task( + self.load_thumbnail_for_page(self.current_filename, "19") + ) if "state" in new_data["print_stats"]: state = new_data["print_stats"]["state"] self.current_state = state logger.info(f"Status Update: {state}") current_page = self._get_current_page() if state == "printing" or state == "paused": - if state == "printing": - self._write(f'p[{self._page_id(PAGE_PRINTING)}].b[44].pic=68') - elif state == "paused": - self._write(f'p[{self._page_id(PAGE_PRINTING)}].b[44].pic=69') + self._loop.create_task(self.display.update_printing_state_ui(state)) if current_page is None or current_page not in PRINTING_PAGES: self._navigate_to_page(PAGE_PRINTING, clear_history=True) elif state == "complete": if current_page is None or current_page != PAGE_PRINTING_COMPLETE: self._navigate_to_page(PAGE_PRINTING_COMPLETE) else: - if current_page is None or current_page in PRINTING_PAGES or current_page == PAGE_PRINTING_COMPLETE or current_page == PAGE_OVERLAY_LOADING: + if ( + current_page is None + or current_page in PRINTING_PAGES + or current_page == PAGE_PRINTING_COMPLETE + or current_page == PAGE_OVERLAY_LOADING + ): self._navigate_to_page(PAGE_MAIN, clear_history=True) if "print_duration" in new_data["print_stats"]: self.current_print_duration = new_data["print_stats"]["print_duration"] - if "display_status" in new_data and "progress" in new_data["display_status"] and "print_duration" in new_data["print_stats"]: + if ( + "display_status" in new_data + and "progress" in new_data["display_status"] + and "print_duration" in new_data["print_stats"] + ): if new_data["display_status"]["progress"] > 0: - total_time = new_data["print_stats"]["print_duration"] / new_data["display_status"]["progress"] - self._write(f'p[{self._page_id(PAGE_PRINTING)}].b[37].txt="{format_time(total_time - new_data["print_stats"]["print_duration"])}"') - - if "output_pin Part_Light" in new_data: + total_time = ( + new_data["print_stats"]["print_duration"] + / new_data["display_status"]["progress"] + ) + self._loop.create_task( + self.display.update_time_remaining( + format_time( + total_time - new_data["print_stats"]["print_duration"] + ) + ) + ) + + if ( + "output_pin Part_Light" in new_data + and new_data["output_pin Part_Light"]["value"] is not None + ): self.part_light_state = int(new_data["output_pin Part_Light"]["value"]) == 1 - if "output_pin Frame_Light" in new_data: - self.part_light_state = int(new_data["output_pin Frame_Light"]["value"]) == 1 + if ( + "output_pin Frame_Light" in new_data + and new_data["output_pin Frame_Light"]["value"] is not None + ): + self.frame_light_state = ( + int(new_data["output_pin Frame_Light"]["value"]) == 1 + ) if "fan" in new_data: self.fan_state = float(new_data["fan"]["speed"]) > 0 self.printing_target_speeds["fan"] = float(new_data["fan"]["speed"]) - self.update_printing_speed_settings_ui() - if "filament_switch_sensor fila" in new_data: - self.filament_sensor_state = int(new_data["filament_switch_sensor fila"]["enabled"]) == 1 + self._loop.create_task( + self.display.update_printing_speed_settings_ui( + self.printing_selected_speed_type, + self.printing_target_speeds[self.printing_selected_speed_type], + ) + ) + if ( + f"filament_switch_sensor {self.filament_sensor_name}" in new_data + and new_data[f"filament_switch_sensor {self.filament_sensor_name}"][ + "enabled" + ] + is not None + ): + self.filament_sensor_state = ( + int( + new_data[f"filament_switch_sensor {self.filament_sensor_name}"][ + "enabled" + ] + ) + == 1 + ) if "configfile" in new_data: self.handle_machine_config_change(new_data["configfile"]) if "extruder" in new_data: if "target" in new_data["extruder"]: self.printing_target_temps["extruder"] = new_data["extruder"]["target"] - if "heater_generic heater_bed" in new_data: - if "target" in new_data["heater_bed"]: - self.printing_target_temps["heater_bed"] = new_data["heater_generic heater_bed"]["target"] + self.printer_heating_value_changed("extruder", new_data["extruder"]["target"]) + if "heater_bed" in new_data: #remove heater_generic + if "target" in new_data["heater_bed"]: + self.printing_target_temps["heater_bed"] = new_data[ + "heater_bed" + ]["target"] + self.printer_heating_value_changed("heater_bed", new_data["heater_bed"]["target"]) if "heater_generic heater_bed_outer" in new_data: if "target" in new_data["heater_generic heater_bed_outer"]: - self.printing_target_temps["heater_bed_outer"] = new_data["heater_generic heater_bed_outer"]["target"] + self.printing_target_temps["heater_bed_outer"] = new_data[ + "heater_generic heater_bed_outer" + ]["target"] + self.printer_heating_value_changed("heater_bed_outer", new_data["heater_generic heater_bed_outer"]["target"]) if "gcode_move" in new_data: if "extrude_factor" in new_data["gcode_move"]: - self.printing_target_speeds["flow"] = float(new_data["gcode_move"]["extrude_factor"]) - self.update_printing_speed_settings_ui() + self.printing_target_speeds["flow"] = float( + new_data["gcode_move"]["extrude_factor"] + ) + self._loop.create_task( + self.display.update_printing_speed_settings_ui( + self.printing_selected_speed_type, + self.printing_target_speeds[self.printing_selected_speed_type], + ) + ) if "speed_factor" in new_data["gcode_move"]: - self.printing_target_speeds["print"] = float(new_data["gcode_move"]["speed_factor"]) - self.update_printing_speed_settings_ui() + self.printing_target_speeds["print"] = float( + new_data["gcode_move"]["speed_factor"] + ) + self._loop.create_task( + self.display.update_printing_speed_settings_ui( + self.printing_selected_speed_type, + self.printing_target_speeds[self.printing_selected_speed_type], + ) + ) self._loop.create_task(self.display.update_data(new_data, data_mapping)) + def printer_heating_value_changed(self, heater, new_value): + if heater == self.printing_selected_heater: + self._loop.create_task( + self.display.update_printing_heater_settings_ui( + self.printing_selected_heater, + new_value, + ) + ) + async def close(self): if not self.connected: return @@ -1085,72 +1232,14 @@ async def close(self): self.writer.close() await self.writer.wait_closed() - def draw_initial_screw_leveling(self): - self._write("b[1].txt=\"Screws Tilt Adjust\"") - self._write("b[2].txt=\"Please Wait...\"") - self._write("b[3].txt=\"Heating...\"") - self._write("vis b[4],0") - self._write("vis b[5],0") - self._write("vis b[6],0") - self._write("vis b[7],0") - self._write("vis b[8],0") - self._write("fill 0,110,320,290,10665") - - def draw_completed_screw_leveling(self): - self._write("b[1].txt=\"Screws Tilt Adjust\"") - self._write("b[2].txt=\"Adjust the screws as indicated\"") - self._write("b[3].txt=\"01:20 means 1 turn and 20 mins\\rCW=clockwise\\rCCW=counter-clockwise\"") - self._write("vis b[4],0") - self._write("vis b[5],0") - self._write("vis b[6],0") - self._write("vis b[7],0") - self._write("vis b[8],1") - self._write("fill 0,110,320,290,10665") - self._write("xstr 12,320,100,20,1,65535,10665,1,1,1,\"front left\"") - self.draw_screw_level_info_at("12,340,100,20", self.screw_levels["front left"]) - - self._write("xstr 170,320,100,20,1,65535,10665,1,1,1,\"front right\"") - self.draw_screw_level_info_at("170,340,100,20", self.screw_levels["front right"]) - - self._write("xstr 170,120,100,20,1,65535,10665,1,1,1,\"rear right\"") - self.draw_screw_level_info_at("170,140,100,20", self.screw_levels["rear right"]) - - self._write("xstr 12,120,100,20,1,65535,10665,1,1,1,\"rear left\"") - self.draw_screw_level_info_at("12,140,100,20", self.screw_levels["rear left"]) - - if 'center right' in self.screw_levels: - self._write("xstr 12,220,100,30,1,65535,10665,1,1,1,\"center\\rright\"") - self.draw_screw_level_info_at("170,240,100,20", self.screw_levels["center right"]) - if 'center left' in self.screw_levels: - self._write("xstr 12,120,100,20,1,65535,10665,1,1,1,\"center\\rleft\"") - self.draw_screw_level_info_at("12,240,100,20", self.screw_levels["center left"]) - - self._write("xstr 96,215,100,50,1,65535,15319,1,1,1,\"Retry\"") - - def draw_screw_level_info_at(self, position, level): - if level == "base": - self._write(f"xstr {position},0,65535,10665,1,1,1,\"base\"") - else: - color = TEXT_SUCCESS if int(level[-2:]) < 5 else TEXT_ERROR - self._write(f"xstr {position},0,{color},10665,1,1,1,\"{level}\"") - async def handle_screw_leveling(self): self.leveling_mode = "screw" self.screw_levels = {} self.screw_probe_count = 0 - await self._send_moonraker_request("printer.gcode.script", {"script": "BED_LEVEL_SCREWS_TUNE"}) - self.draw_completed_screw_leveling() - - def draw_initial_zprobe_leveling(self): - self._write("p[137].b[19].txt=\"Z-Probe\"") - self._write("fill 0,250,320,320,10665") - self._write("fill 0,50,320,80,10665") - self.update_zprobe_leveling_ui() - - def update_zprobe_leveling_ui(self): - self._write("p[137].b[19].txt=\"Z-Probe\"") - self._write(f'p[137].b[11].pic={7 + ["0.01", "0.1", "1"].index(self.z_probe_step)}') - self._write(f'p[137].b[20].txt=\"{self.z_probe_distance}\"') + await self._send_moonraker_request( + "printer.gcode.script", {"script": "BED_LEVEL_SCREWS_TUNE"} + ) + await self.display.draw_completed_screw_leveling(self.screw_levels) async def handle_zprobe_leveling(self): if self.leveling_mode == "zprobe": @@ -1159,75 +1248,81 @@ async def handle_zprobe_leveling(self): self.z_probe_step = "0.1" self.z_probe_distance = "0.0" self._navigate_to_page(PAGE_OVERLAY_LOADING) - await self._send_moonraker_request("printer.gcode.script", {"script": "CALIBRATE_PROBE_Z_OFFSET"}) + await self._send_moonraker_request( + "printer.gcode.script", {"script": "CALIBRATE_PROBE_Z_OFFSET"} + ) self._go_back() - def draw_kamp_page(self): - self._write('fill 0,45,272,340,10665') - self._write('xstr 0,0,272,50,1,65535,10665,1,1,1,"Creating Bed Mesh"') - max_size = 264 # display width - 4px padding - x_probes = self.bed_leveling_counts[0] - y_probes = self.bed_leveling_counts[1] - spacing = 2 - self.bed_leveling_box_size = min(40, int(min(max_size / x_probes, max_size / y_probes) - spacing)) - total_width = (x_probes * (self.bed_leveling_box_size + spacing)) - spacing - total_height = (y_probes * (self.bed_leveling_box_size + spacing)) - spacing - self.bed_leveling_x_offset = 4 + (max_size - total_width) / 2 - self.bed_leveling_y_offset = 45 + (max_size - total_height) / 2 - for x in range(0, x_probes): - for y in range(0, y_probes): - self.draw_kamp_box(x, y, 17037) - - def draw_kamp_box_index(self, index, color): - if self.bed_leveling_counts[0] == 0: - return - row = int(index / self.bed_leveling_counts[0]) - inverted_row = (self.bed_leveling_counts[1]-1) - row - col = index % self.bed_leveling_counts[0] - if row % 2 == 1: - col = self.bed_leveling_counts[0] - 1 - col - self.draw_kamp_box(col, inverted_row, color) - - def draw_kamp_box(self, x, y, color): - box_size = self.bed_leveling_box_size - if box_size > 0: - self._write(f'fill {int(self.bed_leveling_x_offset+x*(box_size+2))},{47+y*(box_size+2)},{box_size},{box_size},{color}') - def handle_gcode_response(self, response): if self.leveling_mode == "screw": if "probe at" in response: self.screw_probe_count += 1 - self._write(f"b[3].txt=\"Probing Screws ({ceil(self.screw_probe_count/2)}/4)...\"") + self._loop.create_task( + self.display.update_screw_level_description( + f"Probing Screws ({ceil(self.screw_probe_count/2)}/4)..." + ) + ) if "screw (base) :" in response: self.screw_levels[response.split("screw")[0][3:].strip()] = "base" if "screw :" in response: - self.screw_levels[response.split("screw")[0][3:].strip()] = response.split("adjust")[1].strip() + self.screw_levels[ + response.split("screw")[0][3:].strip() + ] = response.split("adjust")[1].strip() elif self.leveling_mode == "zprobe": if "Z position:" in response: self.z_probe_distance = response.split("->")[1].split("<-")[0].strip() - self.update_zprobe_leveling_ui() + self._loop.create_task( + self.display.update_zprobe_leveling_ui( + self.z_probe_step, self.z_probe_distance + ) + ) elif "Adapted probe count:" in response: parts = response.split(":")[1].split(",") x_count = int(parts[0].strip(" ()")) y_count = int(parts[1][:-1].strip(" ()")) self.bed_leveling_counts = [x_count, y_count] elif response.startswith("// bed_mesh: generated points"): + self.bed_leveling_probed_count = 0 if self._get_current_page() != PAGE_PRINTING_KAMP: self._navigate_to_page(PAGE_PRINTING_KAMP) elif response.startswith("// probe at "): + if self._get_current_page() != PAGE_PRINTING_KAMP: + # We are not leveling, likely response came from manual probe e.g. from console, + # Skip updating the state, otherwise it messes up bed leveling screen when printing + return new_position = response.split(" ")[3] if self.bed_leveling_last_position != new_position: self.bed_leveling_last_position = new_position if self.bed_leveling_probed_count > 0: - self.draw_kamp_box_index(self.bed_leveling_probed_count - 1, BACKGROUND_SUCCESS) + self._loop.create_task( + self.display.draw_kamp_box_index( + self.bed_leveling_probed_count - 1, + BACKGROUND_SUCCESS, + self.bed_leveling_counts, + ) + ) self.bed_leveling_probed_count += 1 - self.draw_kamp_box_index(self.bed_leveling_probed_count - 1, BACKGROUND_WARNING) - self._write(f'xstr 0,310,320,50,1,65535,10665,1,1,1,"Probing... ({self.bed_leveling_probed_count}/{self.bed_leveling_counts[0]*self.bed_leveling_counts[1]})"') + self._loop.create_task( + self.display.draw_kamp_box_index( + self.bed_leveling_probed_count - 1, + BACKGROUND_WARNING, + self.bed_leveling_counts, + ) + ) + self._loop.create_task( + self.display.update_kamp_text( + f"Probing... ({self.bed_leveling_probed_count}/{self.bed_leveling_counts[0]*self.bed_leveling_counts[1]})" + ) + ) elif response.startswith("// Mesh Bed Leveling Complete"): self.bed_leveling_probed_count = 0 self.bed_leveling_counts = self.full_bed_leveling_counts if self._get_current_page() == PAGE_PRINTING_KAMP: - self._go_back() + if self.leveling_mode == "full_bed": + self._loop.create_task(self.display.show_bed_mesh_final()) + else: + self._go_back() + loop = asyncio.get_event_loop() config_observer = Observer() @@ -1243,16 +1338,22 @@ def handle_wd_callback(notifier): def handle_sock_changes(notifier): if notifier.event_type == "created": - logger.info(f"{notifier.src_path.split('/')[-1]} created. Attempting to reconnect...") + logger.info( + f"{notifier.src_path.split('/')[-1]} created. Attempting to reconnect..." + ) controller.klipper_restart_event.set() config_patterns = ["display_connector.cfg"] - config_event_handler = PatternMatchingEventHandler(config_patterns, None, True, True) + config_event_handler = PatternMatchingEventHandler( + config_patterns, None, True, True + ) config_event_handler.on_modified = handle_wd_callback config_event_handler.on_created = handle_wd_callback socket_patterns = ["klippy.sock", "moonraker.sock"] - socket_event_handler = PatternMatchingEventHandler(socket_patterns, None, True, True) + socket_event_handler = PatternMatchingEventHandler( + socket_patterns, None, True, True + ) socket_event_handler.on_modified = handle_sock_changes socket_event_handler.on_created = handle_sock_changes socket_event_handler.on_deleted = handle_sock_changes diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..310b5cf --- /dev/null +++ b/install.sh @@ -0,0 +1,183 @@ +#!/bin/bash + +SCRIPT=$(realpath "$0") +SCRIPTPATH=$(dirname "$SCRIPT") + +OLD_SERVICE_FILE="/etc/systemd/system/OpenNept4une.service" +SERVICE_FILE="/etc/systemd/system/display.service" +SCRIPT_PATH="$SCRIPTPATH/display.py" +VENV_PATH="$SCRIPTPATH/venv" +LOG_FILE="/var/log/display.log" +MOONRAKER_ASVC="$HOME/printer_data/moonraker.asvc" + +R=$'\e[1;91m' # Red ${R} +G=$'\e[1;92m' # Green ${G} +Y=$'\e[1;93m' # Yellow ${Y} +M=$'\e[1;95m' # Magenta ${M} +C=$'\e[96m' # Cyan ${C} +NC=$'\e[0m' # No Color ${NC} + +reset_env() { + sudo rm -rf "$SCRIPTPATH/venv" + rm -rf "$SCRIPTPATH/__pycache__" +} + +install_env() { + sudo apt update + sudo apt install python3-venv -y + + # Create and activate a Python virtual environment + echo "Creating and activating a virtual environment..." + python3 -m venv "$SCRIPTPATH/venv" + source "$SCRIPTPATH/venv/bin/activate" + + # Install required Python packages + echo "Installing required Python packages..." + pip install -r "$SCRIPTPATH/requirements.txt" + + # Deactivate the virtual environment + echo "Deactivating the virtual environment..." + deactivate + + echo "Environment setup completed." +} + +stop_service() { + if systemctl is-active --quiet OpenNept4une; then + # Stop the service silently + sudo service OpenNept4une stop >/dev/null 2>&1 + # Disable the service silently + sudo service OpenNept4une disable >/dev/null 2>&1 + sudo rm -f $OLD_SERVICE_FILE + fi + + if systemctl is-active --quiet display; then + # Stop the service silently + sudo service display stop >/dev/null 2>&1 + fi +} + +disable_service() { + echo "Disabling the service..." + sudo systemctl disable display.service +} + +create_service() { + # Create the systemd service file + echo "Creating systemd service file at $SERVICE_FILE..." + cat < /dev/null +[Unit] +Description=OpenNept4une TouchScreen Display Service +After=klipper.service klipper-mcu.service moonraker.service +Wants=klipper.service moonraker.service +Documentation=man:display(8) + +[Service] +ExecStartPre=/bin/sleep 10 +ExecStart=$SCRIPTPATH/venv/bin/python $SCRIPTPATH/display.py +WorkingDirectory=$SCRIPTPATH +Restart=on-failure +CPUQuota=50% +RestartSec=10 +User=$USER +ProtectSystem=full +PrivateTmp=true + +[Install] +WantedBy=multi-user.target +EOF + + # Reload systemd to read new service file + echo "Reloading systemd..." + sudo systemctl daemon-reload + + # Enable and start the service + echo "Enabling and starting the service..." + sudo systemctl enable display.service + sudo systemctl start display.service +} + +give_moonraker_control() { + echo "Allowing Moonraker to control display service" + grep -qxF 'display' $MOONRAKER_ASVC || echo 'display' >> $MOONRAKER_ASVC + + sudo service moonraker restart +} + +moonraker_update_manager() { + update_selection="display" + config_file="$HOME/printer_data/config/moonraker.conf" + + if [ "$update_selection" = "OpenNept4une" ]; then + new_lines="[update_manager $update_selection]\n\ +type: git_repo\n\ +primary_branch: $current_branch\n\ +path: $OPENNEPT4UNE_DIR\n\ +is_system_service: False\n\ +origin: $OPENNEPT4UNE_REPO" + + elif [ "$update_selection" = "display" ]; then + current_display_branch=$(git -C "$DISPLAY_CONNECTOR_DIR" symbolic-ref --short HEAD 2>/dev/null) + new_lines="[update_manager $update_selection]\n\ +type: git_repo\n\ +primary_branch: $current_display_branch\n\ +path: $DISPLAY_CONNECTOR_DIR\n\ +virtualenv: $DISPLAY_CONNECTOR_DIR/venv\n\ +requirements: requirements.txt\n\ +origin: $DISPLAY_CONNECTOR_REPO" + else + echo -e "${R}Invalid argument. Please specify either 'OpenNept4une' or 'display_connector'.${NC}" + return 1 + fi + # Check if the lines exist in the config file + if grep -qF "[update_manager $update_selection]" "$config_file"; then + # Lines exist, update them + perl -pi.bak -e "BEGIN{undef $/;} s|\[update_manager $update_selection\].*?((?:\r*\n){2}\|$)|$new_lines\$1|gs" "$config_file" + sync + else + # Lines do not exist, append them to the end of the file + echo -e "\n$new_lines" >> "$config_file" + fi +} + +if [ ! -f "$SCRIPT_PATH" ]; then + echo "${R}Error: Script $SCRIPT_PATH not found.${NC}" + exit 1 +fi + +HELP="Please specify either 'full', 'env', 'service-install', 'service-disable' or 'moonraker'." + +if [ "$1" ]; then + case "$1" in + "full") + reset_env + install_env + stop_service + create_service + give_moonraker_control + ;; + "env") + reset_env + install_env + ;; + "service-install") + stop_service + create_service + ;; + "service-disable") + stop_service + disable_service + ;; + "moonraker") + give_moonraker_control + moonraker_update_manager + ;; + *) + echo "${R}Invalid argument $1. ${Y}$HELP${NC}" + exit 1 + ;; + esac +else + echo "${R}No arguments provided. ${Y}$HELP${NC}" + exit 1 +fi \ No newline at end of file diff --git a/src/communicator.py b/src/communicator.py index a481e1e..64d3743 100644 --- a/src/communicator.py +++ b/src/communicator.py @@ -6,15 +6,23 @@ class DisplayCommunicator: supported_firmware_versions = [] current_data = {} + ips = "--" + def __init__( self, logger: Logger, + model: str, port: str, event_handler, baudrate: int = 115200, timeout: int = 5, ) -> None: + self.display_name_override = None + self.display_name_line_color = None + self.z_display = "mm" + self.logger = logger + self.model = model self.port = port self.baudrate = baudrate self.timeout = timeout @@ -41,6 +49,9 @@ async def check_valid_version(self): return False return True + def get_device_name(self): + return self.model + def get_current_data(self, path): index = 0 current = self.current_data @@ -51,6 +62,9 @@ def get_current_data(self, path): return None return current + async def navigate_to(self, page_id): + await self.write(f"page {page_id}") + async def update_data(self, new_data, data_mapping=None, current_data=None): if data_mapping is None: data_mapping = self.data_mapping @@ -75,7 +89,9 @@ async def update_data(self, new_data, data_mapping=None, current_data=None): self.get_current_data(required_field) for required_field in mapping_leaf.required_fields ] - formatted = mapping_leaf.format_with_required(value, *required_values) + formatted = mapping_leaf.format_with_required( + value, *required_values + ) for mapped_key in mapping_leaf.fields: if mapping_leaf.field_type == "txt": await self.write( diff --git a/src/config.py b/src/config.py index df34f81..702eb16 100644 --- a/src/config.py +++ b/src/config.py @@ -1,5 +1,5 @@ import os -from configparser import ConfigParser +from configparser import ConfigParser, NoOptionError, NoSectionError TEMP_DEFAULTS = { "pla": [210, 60], @@ -28,6 +28,17 @@ def write_changes(self): with open(self.file_path, "w") as configfile: self.write(configfile) + def safe_get(self, section, key, default=None): + try: + return self.get(section, key) + except KeyError: + return default + except NoSectionError: + return default + except NoOptionError: + return default + + def initialize_config_file(self): if not os.path.exists(self.file_path): self.logger.info("Creating config file") @@ -35,7 +46,7 @@ def initialize_config_file(self): self.set( "general", "clean_filename_regex", - ".*_(.*?_(?:[0-9]+h|[0-9]+m|[0-9]+s)+\.gcode)", + r".*_(.*?_(?:[0-9]+h|[0-9]+m|[0-9]+s)+\.gcode)", ) self.add_section("LOGGING") self.set("LOGGING", "file_log_level", "ERROR") @@ -89,4 +100,3 @@ def initialize_config_file(self): with open(self.file_path, "w") as configfile: self.write(configfile) - diff --git a/src/elegoo_custom.py b/src/elegoo_custom.py new file mode 100644 index 0000000..c2a5979 --- /dev/null +++ b/src/elegoo_custom.py @@ -0,0 +1,19 @@ +from logging import Logger + +from src.elegoo_display import ElegooDisplayCommunicator, ElegooDisplayMapper + +MODEL_CUSTOM = "Custom" + + +class CustomDisplayCommunicator(ElegooDisplayCommunicator): + def __init__( + self, + logger: Logger, + model: str, + event_handler, + port: str, + baudrate: int = 115200, + timeout: int = 5, + ) -> None: + super().__init__(logger, model, port, event_handler, baudrate, timeout) + self.mapper = ElegooDisplayMapper() diff --git a/src/elegoo_display.py b/src/elegoo_display.py index cc06819..b00adb8 100644 --- a/src/elegoo_display.py +++ b/src/elegoo_display.py @@ -1,6 +1,52 @@ from src.communicator import DisplayCommunicator -from src.mapping import Mapper, MappingLeaf, build_accessor, build_format_filename, PAGE_MAIN, PAGE_FILES, PAGE_PREPARE_MOVE, PAGE_PREPARE_TEMP, PAGE_PREPARE_EXTRUDER, PAGE_SETTINGS, PAGE_SETTINGS_LANGUAGE, PAGE_SETTINGS_TEMPERATURE, PAGE_SETTINGS_TEMPERATURE_SET, PAGE_SETTINGS_ABOUT, PAGE_SETTINGS_ADVANCED, PAGE_LEVELING, PAGE_LEVELING_SCREW_ADJUST, PAGE_LEVELING_Z_OFFSET_ADJUST, PAGE_CONFIRM_PRINT, PAGE_PRINTING, PAGE_PRINTING_KAMP, PAGE_PRINTING_PAUSE, PAGE_PRINTING_STOP, PAGE_PRINTING_EMERGENCY_STOP, PAGE_PRINTING_COMPLETE, PAGE_PRINTING_FILAMENT, PAGE_PRINTING_SPEED, PAGE_PRINTING_ADJUST, PAGE_PRINTING_FILAMENT_RUNOUT, PAGE_PRINTING_DIALOG_SPEED, PAGE_PRINTING_DIALOG_FAN, PAGE_PRINTING_DIALOG_FLOW, PAGE_OVERLAY_LOADING, PAGE_LIGHTS, PAGE_SHUTDOWN_DIALOG, format_temp, format_time, format_percent -from src.colors import BACKGROUND_GRAY, TEXT_WARNING +from src.mapping import ( + Mapper, + MappingLeaf, + build_accessor, + build_format_filename, + PAGE_MAIN, + PAGE_FILES, + PAGE_PREPARE_MOVE, + PAGE_PREPARE_TEMP, + PAGE_PREPARE_EXTRUDER, + PAGE_SETTINGS, + PAGE_SETTINGS_LANGUAGE, + PAGE_SETTINGS_TEMPERATURE, + PAGE_SETTINGS_TEMPERATURE_SET, + PAGE_SETTINGS_ABOUT, + PAGE_SETTINGS_ADVANCED, + PAGE_LEVELING, + PAGE_LEVELING_SCREW_ADJUST, + PAGE_LEVELING_Z_OFFSET_ADJUST, + PAGE_CONFIRM_PRINT, + PAGE_PRINTING, + PAGE_PRINTING_KAMP, + PAGE_PRINTING_PAUSE, + PAGE_PRINTING_STOP, + PAGE_PRINTING_EMERGENCY_STOP, + PAGE_PRINTING_COMPLETE, + PAGE_PRINTING_FILAMENT, + PAGE_PRINTING_SPEED, + PAGE_PRINTING_ADJUST, + PAGE_PRINTING_FILAMENT_RUNOUT, + PAGE_PRINTING_DIALOG_SPEED, + PAGE_PRINTING_DIALOG_FAN, + PAGE_PRINTING_DIALOG_FLOW, + PAGE_OVERLAY_LOADING, + PAGE_LIGHTS, + PAGE_SHUTDOWN_DIALOG, + format_temp, + format_time, + format_percent, +) +from src.colors import ( + BACKGROUND_DIALOG, + BACKGROUND_GRAY, + TEXT_ERROR, + TEXT_SUCCESS, + TEXT_WARNING, +) +from src.wifi import get_wlan0_status class ElegooDisplayMapper(Mapper): @@ -127,13 +173,19 @@ def __init__(self) -> None: "print_stats": { "print_duration": [ MappingLeaf( - [build_accessor(self.map_page(PAGE_PRINTING), "6")], + [ + build_accessor(self.map_page(PAGE_PRINTING), "6"), + build_accessor(self.map_page(PAGE_PRINTING_COMPLETE), "4"), + ], formatter=format_time, ) ], "filename": [ MappingLeaf( - [build_accessor(self.map_page(PAGE_PRINTING), "t0")], + [ + build_accessor(self.map_page(PAGE_PRINTING), "t0"), + build_accessor(self.map_page(PAGE_PRINTING_COMPLETE), "3"), + ], formatter=build_format_filename("printing"), ) ], @@ -228,46 +280,53 @@ def __init__(self) -> None: ) ] }, - "filament_switch_sensor fila": { - "enabled": [ - MappingLeaf( - [ - build_accessor(self.map_page(PAGE_SETTINGS), "11"), - build_accessor(self.map_page(PAGE_PRINTING_ADJUST), "16"), - ], - field_type="pic", - formatter=lambda x: "77" if int(x) == 1 else "76", - ) - ] - }, } + self.set_z_display("mm") def set_z_display(self, value): - if value == "mm": - self.data_mapping["motion_report"]["live_position"][2] = [ - MappingLeaf( - [ - build_accessor(self.map_page(PAGE_MAIN), "z_pos"), - build_accessor(self.map_page(PAGE_PRINTING), "zvalue"), - ] - ) - ] - elif value == "layer": + if value == "layer": self.data_mapping["print_stats"]["info"] = { "current_layer": [ MappingLeaf( [build_accessor(self.map_page(PAGE_PRINTING), "zvalue")], required_fields=[["print_stats", "info", "total_layer"]], - formatter=lambda current, total: f"{current:.0f}/{total:.0f}" if current is not None and total is not None else '0/0', + formatter=lambda current, total: f"{current:.0f}/{total:.0f}" + if current is not None and total is not None + else "0/0", ) ], "total_layer": [MappingLeaf([])], } + else: + self.data_mapping["motion_report"]["live_position"][2] = [ + MappingLeaf( + [ + build_accessor(self.map_page(PAGE_MAIN), "z_pos"), + build_accessor(self.map_page(PAGE_PRINTING), "zvalue"), + ] + ) + ] + + def set_filament_sensor_name(self, value): + self.data_mapping[f"filament_switch_sensor {value}"] = { + "enabled": [ + MappingLeaf( + [ + build_accessor(self.map_page(PAGE_SETTINGS), "11"), + build_accessor(self.map_page(PAGE_PRINTING_ADJUST), "16"), + ], + field_type="pic", + formatter=lambda x: "77" if int(x) == 1 else "76", + ) + ] + } class ElegooDisplayCommunicator(DisplayCommunicator): supported_firmware_versions = ["1.2.11", "1.2.12"] + bed_leveling_box_size = 20 + async def get_firmware_version(self) -> str: return await self.display.get("p[35].b[11].txt", self.timeout) @@ -277,3 +336,429 @@ async def check_valid_version(self): await self.write( f'xstr 0,464,320,16,2,{TEXT_WARNING},{BACKGROUND_GRAY},1,1,1,"WARNING: Unsupported Display Firmware Version"' ) + + async def special_page_handling(self, current_page): + if current_page == PAGE_MAIN: + has_wifi = await self.update_wifi_ui() + if self.has_two_beds: + await self.write("vis out_bedtemp,1") + if self.display_name_override: + display_name = self.display_name_override + if display_name == "MODEL_NAME": + display_name = self.get_device_name() + await self.write( + "xstr 12,20,180,20,1,65535," + + str(BACKGROUND_GRAY) + + ',0,1,1,"' + + display_name + + '"' + ) + if self.display_name_line_color: + await self.write("fill 13,47,24,4," + str(self.display_name_line_color)) + + await self.write(f"xpic {200 if has_wifi else 230},16,30,30,220,200,51") + elif current_page == PAGE_SETTINGS_ABOUT: + await self.write( + "p[" + + self.mapper.map_page(PAGE_SETTINGS_ABOUT) + + '].b[16].txt="' + + self.ips + + '"' + ) + await self.write("fill 0,400,320,60," + str(BACKGROUND_GRAY)) + await self.write( + "xstr 0,400,320,30,1,65535," + + str(BACKGROUND_GRAY) + + ',1,1,1,"OpenNept4une"' + ) + await self.write( + "xstr 0,430,320,30,2,GRAY," + + str(BACKGROUND_GRAY) + + ',1,1,1,"github.com/OpenNeptune3D"' + ) + elif current_page == PAGE_LIGHTS: + await self.write("b[3].txt=\"Part Light\"") + await self.write("b[4].txt=\"Frame Light\"") + elif current_page == PAGE_PRINTING: + await self.write("printvalue.xcen=0") + await self.write("move printvalue,13,267,13,267,0,10") + await self.write("vis b[16],0") + elif current_page == PAGE_PRINTING_COMPLETE: + await self.write('b[4].txt="Print Completed!"') + elif current_page == PAGE_PRINTING_ADJUST: + await self.write('b[20].txt="' + self.ips + '"') + elif current_page == PAGE_LEVELING: + await self.write('b[12].txt="Leveling"') + await self.write('b[18].txt="Screws Tilt Adjust"') + await self.write('b[19].txt="Z-Probe Offset"') + await self.write('b[20].txt="Full Bed Level"') + self.leveling_mode = None + elif current_page == PAGE_PRINTING_DIALOG_SPEED: + await self.write("b[3].maxval=200") + elif current_page == PAGE_PRINTING_DIALOG_FLOW: + await self.write("b[3].maxval=200") + elif current_page == PAGE_SHUTDOWN_DIALOG: + await self.write("fill 20,100,232,240," + str(BACKGROUND_DIALOG)) + await self.write('xstr 24,104,224,50,1,65535,10665,1,1,1,"Shut Down Host"') + await self.write('xstr 24,158,224,50,1,65535,10665,1,1,1,"Reboot Host"') + await self.write('xstr 24,212,224,50,1,65535,10665,1,1,1,"Reboot Klipper"') + await self.write('xstr 24,286,224,50,1,65535,10665,1,1,1,"Back"') + + async def update_printing_heater_settings_ui( + self, printing_selected_heater, printing_target_temp + ): + if self.has_two_beds: + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_FILAMENT)}].b0.picc=" + + str(90 if printing_selected_heater == "extruder" else 89) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_FILAMENT)}].b1.picc=" + + str(90 if printing_selected_heater == "heater_bed" else 89) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_FILAMENT)}].b2.picc=" + + str(90 if printing_selected_heater == "heater_bed_outer" else 89) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_FILAMENT)}].targettemp.txt=\"" + + f"{printing_target_temp:.0f}\"" + ) + + else: + await self.write( + f'p[{self.mapper.map_page(PAGE_PRINTING_FILAMENT)}].b[13].pic={54 + ["extruder", "heater_bed"].index(printing_selected_heater)}' + ) + await self.write( + f'p[{self.mapper.map_page(PAGE_PRINTING_FILAMENT)}].b[35].txt="' + + f"{printing_target_temp:.0f}" + + '"' + ) + + async def update_printing_temperature_increment_ui( + self, printing_selected_temp_increment + ): + await self.write( + f'p[{self.mapper.map_page(PAGE_PRINTING_FILAMENT)}].p1.pic={56 + ["1", "5", "10"].index(printing_selected_temp_increment)}' + ) + + async def update_printing_speed_settings_ui( + self, printing_selected_speed_type, printing_target_speed + ): + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_SPEED)}].b0.picc=" + + str(59 if printing_selected_speed_type == "print" else 58) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_SPEED)}].b1.picc=" + + str(59 if printing_selected_speed_type == "flow" else 58) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_SPEED)}].b2.picc=" + + str(59 if printing_selected_speed_type == "fan" else 58) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_SPEED)}].targetspeed.val={printing_target_speed*100:.0f}" + ) + + async def update_printing_speed_increment_ui( + self, printing_selected_speed_increment + ): + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_SPEED)}].b[14].picc=" + + str(59 if printing_selected_speed_increment == "1" else 58) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_SPEED)}].b[15].picc=" + + str(59 if printing_selected_speed_increment == "5" else 58) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_SPEED)}].b[16].picc=" + + str(59 if printing_selected_speed_increment == "10" else 58) + ) + + async def update_printing_zoffset_increment_ui(self, z_offset_distance): + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_ADJUST)}].b[23].picc=" + + str(36 if z_offset_distance == "0.01" else 65) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_ADJUST)}].b[24].picc=" + + str(36 if z_offset_distance == "0.1" else 65) + ) + await self.write( + f"p[{self.mapper.map_page(PAGE_PRINTING_ADJUST)}].b[25].picc=" + + str(36 if z_offset_distance == "1" else 65) + ) + + async def update_preset_temp_ui( + self, + temperature_preset_step, + temperature_preset_extruder, + temperature_preset_bed, + ): + await self.write( + f"p[{self.mapper.map_page(PAGE_SETTINGS_TEMPERATURE_SET)}].b[7].pic={56 + [1, 5, 10].index(temperature_preset_step)}" + ) + await self.write( + f'p[{self.mapper.map_page(PAGE_SETTINGS_TEMPERATURE_SET)}].b[18].txt="{temperature_preset_extruder}"' + ) + await self.write( + f'p[{self.mapper.map_page(PAGE_SETTINGS_TEMPERATURE_SET)}].b[19].txt="{temperature_preset_bed}"' + ) + + async def update_prepare_move_ui(self, move_distance): + await self.write( + f'p[{self.mapper.map_page(PAGE_PREPARE_MOVE)}].p0.pic={10 + ["0.1", "1", "10"].index(move_distance)}' + ) + + async def update_prepare_extrude_ui(self, extrude_amount, extrude_speed): + await self.write( + f'p[{self.mapper.map_page(PAGE_PREPARE_EXTRUDER)}].b[8].txt="{extrude_amount}"' + ) + await self.write( + f'p[{self.mapper.map_page(PAGE_PREPARE_EXTRUDER)}].b[9].txt="{extrude_speed}"' + ) + + async def update_wifi_ui(self): + has_wifi, ssid, rssi_category = get_wlan0_status() + if not has_wifi: + await self.write("picq 230,0,42,42,214") + return False + if ssid is None: + await self.write("picq 230,0,42,42,313") + else: + await self.write(f"picq 230,0,42,42,{313 + rssi_category}") + return True + + async def show_files_page(self, current_dir, dir_contents, files_page): + page_size = 5 + title = current_dir.split("/")[-1] + if title == "": + title = "Files" + file_count = len(dir_contents) + if file_count == 0: + await self.write( + f'p[{self.mapper.map_page(PAGE_FILES)}].b[11].txt="{title} (Empty)"' + ) + else: + await self.write( + f'p[{self.mapper.map_page(PAGE_FILES)}].b[11].txt="{title} ({(files_page * page_size) + 1}-{min((files_page * page_size) + page_size, file_count)}/{file_count})"' + ) + component_index = 0 + for index in range( + files_page * page_size, min(len(dir_contents), (files_page + 1) * page_size) + ): + file = dir_contents[index] + await self.write( + f'p[{self.mapper.map_page(PAGE_FILES)}].b[{component_index + 18}].txt="{file["name"]}"' + ) + if file["type"] == "dir": + await self.write( + f"p[{self.mapper.map_page(PAGE_FILES)}].b[{component_index + 13}].pic=194" + ) + else: + await self.write( + f"p[{self.mapper.map_page(PAGE_FILES)}].b[{component_index + 13}].pic=193" + ) + component_index += 1 + for index in range(component_index, page_size): + await self.write( + f"p[{self.mapper.map_page(PAGE_FILES)}].b[{index + 13}].pic=195" + ) + await self.write( + f'p[{self.mapper.map_page(PAGE_FILES)}].b[{index + 18}].txt=""' + ) + + async def update_printing_state_ui(self, state): + if state == "printing": + await self.write(f"p[{self.mapper.map_page(PAGE_PRINTING)}].b[44].pic=68") + elif state == "paused": + await self.write(f"p[{self.mapper.map_page(PAGE_PRINTING)}].b[44].pic=69") + + async def set_data_prepare_screen(self, filename, metadata): + await self.write(f"p[{self.mapper.map_page(PAGE_CONFIRM_PRINT)}].b[3].font=2") + await self.write( + f'p[{self.mapper.map_page(PAGE_CONFIRM_PRINT)}].b[2].txt="{build_format_filename()(filename)}"' + ) + info = [] + if "layer_height" in metadata: + info.append(f"Layer: {metadata['layer_height']}mm") + if "estimated_time" in metadata: + info.append(f"Time: {format_time(metadata['estimated_time'])}") + await self.write( + f'p[{self.mapper.map_page(PAGE_CONFIRM_PRINT)}].b[3].txt="{" ".join(info)}"' + ) + + async def draw_initial_screw_leveling(self): + await self.write('b[1].txt="Screws Tilt Adjust"') + await self.write('b[2].txt="Please Wait..."') + await self.write('b[3].txt="Heating..."') + await self.write("vis b[4],0") + await self.write("vis b[5],0") + await self.write("vis b[6],0") + await self.write("vis b[7],0") + await self.write("vis b[8],0") + await self.write("fill 0,110,320,290,10665") + + async def draw_completed_screw_leveling(self, screw_levels): + await self.write('b[1].txt="Screws Tilt Adjust"') + await self.write('b[2].txt="Adjust the screws as indicated"') + await self.write( + 'b[3].txt="01:20 means 1 turn and 20 mins\\rCW=clockwise\\rCCW=counter-clockwise"' + ) + await self.write("vis b[4],0") + await self.write("vis b[5],0") + await self.write("vis b[6],0") + await self.write("vis b[7],0") + await self.write("vis b[8],1") + await self.write("fill 0,110,320,290,10665") + await self.write('xstr 12,320,100,20,1,65535,10665,1,1,1,"front left"') + await self.draw_screw_level_info_at("12,340,100,20", screw_levels["front left"]) + + await self.write('xstr 170,320,100,20,1,65535,10665,1,1,1,"front right"') + await self.draw_screw_level_info_at( + "170,340,100,20", screw_levels["front right"] + ) + + await self.write('xstr 170,120,100,20,1,65535,10665,1,1,1,"rear right"') + await self.draw_screw_level_info_at( + "170,140,100,20", screw_levels["rear right"] + ) + + await self.write('xstr 12,120,100,20,1,65535,10665,1,1,1,"rear left"') + await self.draw_screw_level_info_at("12,140,100,20", screw_levels["rear left"]) + + if "center right" in screw_levels: + await self.write('xstr 12,220,100,30,1,65535,10665,1,1,1,"center\\rright"') + await self.draw_screw_level_info_at( + "170,240,100,20", screw_levels["center right"] + ) + if "center left" in screw_levels: + await self.write('xstr 12,120,100,20,1,65535,10665,1,1,1,"center\\rleft"') + await self.draw_screw_level_info_at( + "12,240,100,20", screw_levels["center left"] + ) + + await self.write('xstr 96,215,100,50,1,65535,15319,1,1,1,"Retry"') + + async def draw_screw_level_info_at(self, position, level): + if level == "base": + await self.write(f'xstr {position},0,65535,10665,1,1,1,"base"') + else: + color = TEXT_SUCCESS if int(level[-2:]) < 5 else TEXT_ERROR + await self.write(f'xstr {position},0,{color},10665,1,1,1,"{level}"') + + async def update_screw_level_description(self, text): + await self.write(f'b[3].txt="${text}"') + + async def draw_initial_zprobe_leveling(self, z_probe_step, z_probe_distance): + await self.write('p[137].b[19].txt="Z-Probe"') + await self.write("fill 0,250,320,320,10665") + await self.write("fill 0,50,320,80,10665") + await self.update_zprobe_leveling_ui(z_probe_step, z_probe_distance) + + async def update_zprobe_leveling_ui(self, z_probe_step, z_probe_distance): + await self.write('p[137].b[19].txt="Z-Probe"') + await self.write( + f'p[137].b[11].pic={7 + ["0.01", "0.1", "1"].index(z_probe_step)}' + ) + await self.write(f'p[137].b[20].txt="{z_probe_distance}"') + + async def draw_kamp_page(self, bed_leveling_counts): + await self.write("fill 0,45,272,340,10665") + await self.write('xstr 0,0,272,50,1,65535,10665,1,1,1,"Creating Bed Mesh"') + max_size = 264 # display width - 4px padding + x_probes = bed_leveling_counts[0] + y_probes = bed_leveling_counts[1] + spacing = 2 + self.bed_leveling_box_size = min( + 40, int(min(max_size / x_probes, max_size / y_probes) - spacing) + ) + total_width = (x_probes * (self.bed_leveling_box_size + spacing)) - spacing + total_height = (y_probes * (self.bed_leveling_box_size + spacing)) - spacing + self.bed_leveling_x_offset = 4 + (max_size - total_width) / 2 + self.bed_leveling_y_offset = 45 + (max_size - total_height) / 2 + for x in range(0, x_probes): + for y in range(0, y_probes): + await self.draw_kamp_box(x, y, 17037) + + async def update_kamp_text(self, text): + await self.write(f'xstr 0,310,320,30,1,65535,10665,1,1,1,"{text}"') + + async def draw_kamp_box_index(self, index, color, bed_leveling_counts): + if bed_leveling_counts[0] == 0: + return + row = int(index / bed_leveling_counts[0]) + inverted_row = (bed_leveling_counts[1] - 1) - row + col = index % bed_leveling_counts[0] + if row % 2 == 1: + col = bed_leveling_counts[0] - 1 - col + await self.draw_kamp_box(col, inverted_row, color) + + async def draw_kamp_box(self, x, y, color): + box_size = self.bed_leveling_box_size + if box_size > 0: + await self.write( + f"fill {int(self.bed_leveling_x_offset+x*(box_size+2))},{47+y*(box_size+2)},{box_size},{box_size},{color}" + ) + + async def update_klipper_version_ui(self, software_version): + await self.write( + "p[" + + self.mapper.map_page(PAGE_SETTINGS_ABOUT) + + '].b[10].txt="' + + software_version + + '"' + ) + + async def update_machine_size_ui(self, max_x, max_y, max_z): + await self.write( + f'p[{self.mapper.map_page(PAGE_SETTINGS_ABOUT)}].b[9].txt="{max_x}x{max_y}x{max_z}"' + ) + + async def display_thumbnail(self, page_number, thumbnail): + await self.write("vis cp0,1") + await self.write("p[" + str(page_number) + "].cp0.close()") + + parts = [] + start = 0 + end = 1024 + while start + 1024 < len(thumbnail): + parts.append(thumbnail[start:end]) + start = start + 1024 + end = end + 1024 + + parts.append(thumbnail[start : len(thumbnail)]) + for part in parts: + await self.write( + "p[" + str(page_number) + '].cp0.write("' + str(part) + '")' + ) + + async def hide_thumbnail(self): + await self.write("vis cp0,0") + + async def update_time_remaining(self, time_remaining): + await self.write( + f'p[{self.mapper.map_page(PAGE_PRINTING)}].b[37].txt="{time_remaining}"' + ) + + async def show_bed_mesh_final(self): + await self.update_kamp_text("Bed Mesh completed") + await self.write( + 'xstr 0,350,272,30,1,65535,10665,1,1,1,"Tap SAVE to update printer config and restart"' + ) + await self.write('xstr 40,400,200,50,1,65535,15319,1,1,1,"SAVE"') + + async def show_shutdown_screens(self): + await self.write("cls 44637") + await self.write( + 'xstr 8,220,256,20,1,54151,44637,1,1,1,"Please wait while your printer"' + ) + await self.write('xstr 24,240,224,20,1,54151,44637,1,1,1,"shuts down."') + await self.write("delay=10000") + await self.write("cls BLACK") + await self.write( + 'xstr 24,220,224,20,1,54150,0,1,1,1,"It\'s now safe to turn off"' + ) + await self.write('xstr 24,240,224,20,1,54150,0,1,1,1,"your printer."') diff --git a/src/elegoo_neptune3.py b/src/elegoo_neptune3.py new file mode 100644 index 0000000..9eedb7d --- /dev/null +++ b/src/elegoo_neptune3.py @@ -0,0 +1,29 @@ +from logging import Logger + +from src.elegoo_display import ElegooDisplayCommunicator, ElegooDisplayMapper + + +MODEL_N3_REGULAR = "N3" +MODEL_N3_PRO = "N3Pro" +MODEL_N3_PLUS = "N3Plus" +MODEL_N3_MAX = "N3Max" + +MODELS_N3 = [MODEL_N3_REGULAR, MODEL_N3_PRO, MODEL_N3_PLUS, MODEL_N3_MAX] + + +class Neptune3DisplayMapper(ElegooDisplayMapper): + pass + + +class Neptune3DisplayCommunicator(ElegooDisplayCommunicator): + def __init__( + self, + logger: Logger, + model: str, + event_handler, + port: str, + baudrate: int = 115200, + timeout: int = 5, + ) -> None: + super().__init__(logger, model, port, event_handler, baudrate, timeout) + self.mapper = Neptune3DisplayMapper() diff --git a/src/elegoo_neptune4.py b/src/elegoo_neptune4.py index bda08fa..671a7c3 100644 --- a/src/elegoo_neptune4.py +++ b/src/elegoo_neptune4.py @@ -1,5 +1,13 @@ from logging import Logger -from src.mapping import MappingLeaf, build_accessor, PAGE_MAIN, PAGE_PREPARE_TEMP, PAGE_PRINTING_FILAMENT, format_temp +from src.mapping import ( + MappingLeaf, + build_accessor, + PAGE_MAIN, + PAGE_PREPARE_TEMP, + PAGE_PRINTING_FILAMENT, + PAGE_SETTINGS_ABOUT, + format_temp, +) from src.elegoo_display import ElegooDisplayMapper, ElegooDisplayCommunicator MODEL_N4_REGULAR = "N4" @@ -7,6 +15,8 @@ MODEL_N4_PLUS = "N4Plus" MODEL_N4_MAX = "N4Max" +MODELS_N4 = [MODEL_N4_REGULAR, MODEL_N4_PRO, MODEL_N4_PLUS, MODEL_N4_MAX] + class Neptune4Mapper(ElegooDisplayMapper): pass @@ -76,13 +86,13 @@ def __init__(self) -> None: ), ], } - + self.set_filament_sensor_name("filament_sensor") class Neptune4PlusMapper(Neptune4Mapper): pass - + class Neptune4MaxMapper(Neptune4Mapper): pass @@ -93,13 +103,20 @@ def __init__( logger: Logger, model: str, event_handler, - port: str = "/dev/ttyS1", + port: str = None, baudrate: int = 115200, timeout: int = 5, ) -> None: - super().__init__(logger, port, event_handler, baudrate, timeout) - self.model = model + super().__init__( + logger, + model, + port if port else "/dev/ttyS1", + event_handler, + baudrate, + timeout, + ) self.mapper = self.get_mapper(model) + self.has_two_beds = model.lower() == MODEL_N4_PRO.lower() def get_mapper(self, model: str) -> Neptune4Mapper: if model.lower() == MODEL_N4_REGULAR.lower(): @@ -121,5 +138,40 @@ def get_mapper(self, model: str) -> Neptune4Mapper: self.model = MODEL_N4_REGULAR return Neptune4Mapper() + def get_device_name(self): + model_map = { + MODEL_N4_REGULAR: "Neptune 4", + MODEL_N4_PRO: "Neptune 4 Pro", + MODEL_N4_PLUS: "Neptune 4 Plus", + MODEL_N4_MAX: "Neptune 4 Max", + } + return model_map[self.model] + + async def initialize_display(self): + await self.write("sendxy=1") + model_image_key = None + if self.model == MODEL_N4_REGULAR: + model_image_key = "213" + elif self.model == MODEL_N4_PRO: + model_image_key = "214" + await self.write( + f"p[{self.mapper.map_page(PAGE_MAIN)}].disp_q5.val=1" + ) # N4Pro Outer Bed Symbol (Bottom Rig> + elif self.model == MODEL_N4_PLUS: + model_image_key = "313" + elif self.model == MODEL_N4_MAX: + model_image_key = "314" + + if self.display_name_override is None: + await self.write( + f"p[{self.mapper.map_page(PAGE_MAIN)}].q4.picc={model_image_key}" + ) + else: + await self.write(f"p[{self.mapper.map_page(PAGE_MAIN)}].q4.picc=137") + + await self.write( + f'p[{self.mapper.map_page(PAGE_SETTINGS_ABOUT)}].b[8].txt="{self.get_device_name()}"' + ) + def get_model(self) -> str: return self.model diff --git a/src/response_actions.py b/src/response_actions.py index a0f27fb..469dea2 100644 --- a/src/response_actions.py +++ b/src/response_actions.py @@ -1,4 +1,25 @@ -from src.mapping import PAGE_PREPARE_MOVE, PAGE_PREPARE_TEMP, PAGE_PREPARE_EXTRUDER, PAGE_SETTINGS, PAGE_SETTINGS_LANGUAGE, PAGE_SETTINGS_TEMPERATURE, PAGE_SETTINGS_ABOUT, PAGE_SETTINGS_ADVANCED, PAGE_LEVELING, PAGE_LEVELING_SCREW_ADJUST, PAGE_LEVELING_Z_OFFSET_ADJUST, PAGE_PRINTING_STOP, PAGE_PRINTING_EMERGENCY_STOP, PAGE_PRINTING_FILAMENT, PAGE_PRINTING_SPEED, PAGE_PRINTING_ADJUST, PAGE_PRINTING_DIALOG_SPEED, PAGE_PRINTING_DIALOG_FLOW, PAGE_LIGHTS, PAGE_SHUTDOWN_DIALOG +from src.mapping import ( + PAGE_PREPARE_MOVE, + PAGE_PREPARE_TEMP, + PAGE_PREPARE_EXTRUDER, + PAGE_SETTINGS, + PAGE_SETTINGS_LANGUAGE, + PAGE_SETTINGS_TEMPERATURE, + PAGE_SETTINGS_ABOUT, + PAGE_SETTINGS_ADVANCED, + PAGE_LEVELING, + PAGE_LEVELING_SCREW_ADJUST, + PAGE_LEVELING_Z_OFFSET_ADJUST, + PAGE_PRINTING_STOP, + PAGE_PRINTING_EMERGENCY_STOP, + PAGE_PRINTING_FILAMENT, + PAGE_PRINTING_SPEED, + PAGE_PRINTING_ADJUST, + PAGE_PRINTING_DIALOG_SPEED, + PAGE_PRINTING_DIALOG_FLOW, + PAGE_LIGHTS, + PAGE_SHUTDOWN_DIALOG, +) response_actions = { # Main @@ -22,6 +43,7 @@ 3: { 7: "page " + PAGE_LEVELING_SCREW_ADJUST, 8: "page " + PAGE_LEVELING_Z_OFFSET_ADJUST, + 9: "begin_full_bed_level", }, # Prepare Temperature (Pro Only) 6: { @@ -171,8 +193,8 @@ 5: "zoffset_-", 7: "page " + PAGE_LIGHTS, 8: "toggle_filament_sensor", - 12: "page " + PAGE_PRINTING_FILAMENT, - 13: "page " + PAGE_PRINTING_SPEED, + 9: "page " + PAGE_PRINTING_FILAMENT, + 10: "page " + PAGE_PRINTING_SPEED, }, # Printing Speed 135: { @@ -205,7 +227,7 @@ 6: { 0: "set_temp_extruder_$", 1: "set_temp_heater_bed_$", - 2: "set_temp_heater_bed_outer_$", + 10: "set_temp_heater_bed_outer_$", }, # Prepare Extruder 9: { @@ -235,7 +257,7 @@ (24, 104, 248, 154): "shutdown_host", (24, 158, 248, 208): "reboot_host", (24, 212, 248, 262): "reboot_klipper", - (0, 0, 272, 480): "go_back", - } + }, + "printing_kamp": {(40, 400, 230, 450): "save_config"}, } diff --git a/src/tjc.py b/src/tjc.py index 5ba1021..96443f2 100644 --- a/src/tjc.py +++ b/src/tjc.py @@ -11,6 +11,7 @@ TJCNumericInputPayload = namedtuple("Numeric", "page_id component_id value") TJCTouchCoordinatePayload = namedtuple("TouchCoordinate", "x y touch_event") + class EventType(IntEnum): TOUCH = 0x65 # Touch event TOUCH_COORDINATE = 0x67 # Touch coordinate diff --git a/src/wifi.py b/src/wifi.py index 7c8c33f..6c1c4c3 100644 --- a/src/wifi.py +++ b/src/wifi.py @@ -1,23 +1,28 @@ import subprocess # nosec + def get_wlan0_status(): try: # Get the SSID - ssid_output = subprocess.check_output(['nmcli', '-t', '-f', 'active,ssid', 'dev', 'wifi']).decode('utf-8') # nosec B603, B607 + ssid_output = subprocess.check_output( + ["nmcli", "-t", "-f", "active,ssid", "dev", "wifi"] + ).decode("utf-8") # nosec B603, B607 if len(ssid_output) == 0: return False, None, None ssid = None - for line in ssid_output.strip().split('\n'): - if line.startswith('yes:'): - ssid = line.split(':')[1] + for line in ssid_output.strip().split("\n"): + if line.startswith("yes:"): + ssid = line.split(":")[1] break # Get the signal strength - rssi_output = subprocess.check_output(['nmcli', '-f', 'in-use,signal', '-t', 'dev', 'wifi']).decode('utf-8') # nosec B603, B607 + rssi_output = subprocess.check_output( + ["nmcli", "-f", "in-use,signal", "-t", "dev", "wifi"] + ).decode("utf-8") # nosec B603, B607 rssi = None - for line in rssi_output.strip().split('\n'): - if line.startswith('*:'): - rssi = int(line.split(':')[1]) # Convert the string to an integer + for line in rssi_output.strip().split("\n"): + if line.startswith("*:"): + rssi = int(line.split(":")[1]) # Convert the string to an integer break # Categorize the signal strength @@ -28,6 +33,10 @@ def get_wlan0_status(): except subprocess.CalledProcessError: return False, None, None + except FileNotFoundError: + return False, None, None + + def categorize_signal_strength(signal_percentage): if signal_percentage is None: return 0 diff --git a/test_requirements.txt b/test_requirements.txt index 3481c12..30798e4 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -1,3 +1,5 @@ pytest +pytest-cov +pytest-asyncio ruff bandit \ No newline at end of file diff --git a/tests/communicator_test.py b/tests/communicator_test.py new file mode 100644 index 0000000..92f7a5e --- /dev/null +++ b/tests/communicator_test.py @@ -0,0 +1,30 @@ +import logging +from unittest.mock import AsyncMock +import pytest +from src.communicator import DisplayCommunicator + + +@pytest.mark.asyncio +async def test_valid_version(): + communicator = DisplayCommunicator(logging, None, None, None) + communicator.get_firmware_version = AsyncMock(return_value="1.0") + communicator.supported_firmware_versions = ["1.0"] + is_valid = await communicator.check_valid_version() + communicator.get_firmware_version.assert_awaited_once() + assert is_valid + +@pytest.mark.asyncio +async def test_invalid_version(): + communicator = DisplayCommunicator(logging, None, None, None) + communicator.get_firmware_version = AsyncMock(return_value="1.0") + communicator.supported_firmware_versions = ["2.0"] + is_valid = await communicator.check_valid_version() + communicator.get_firmware_version.assert_awaited_once() + assert not is_valid + +@pytest.mark.asyncio +async def test_navigate(): + communicator = DisplayCommunicator(logging, None, None, None) + communicator.write = AsyncMock() + await communicator.navigate_to("1") + communicator.write.assert_awaited_once_with("page 1") \ No newline at end of file diff --git a/tests/config_test.py b/tests/config_test.py index 303f699..9ff846f 100644 --- a/tests/config_test.py +++ b/tests/config_test.py @@ -7,33 +7,61 @@ logger = logging.getLogger(__name__) + def test_initializing(tmp_path): - config = ConfigHandler(str(tmp_path) + '/test_config.ini', logger) - assert config.file == str(tmp_path) + '/test_config.ini' - assert os.path.exists(str(tmp_path) + '/test_config.ini') + config = ConfigHandler(str(tmp_path) + "/test_config.ini", logger) + assert config.file == str(tmp_path) + "/test_config.ini" + assert os.path.exists(str(tmp_path) + "/test_config.ini") + def test_reload_config(tmp_path): - config = ConfigHandler(str(tmp_path) + '/test_config.ini', logger) + config = ConfigHandler(str(tmp_path) + "/test_config.ini", logger) with pytest.raises(NoSectionError): - config.get('test', 'test') - with open(str(tmp_path) + '/test_config.ini', 'w') as f: - f.write('[test]\ntest = test2') + config.get("test", "test") + with open(str(tmp_path) + "/test_config.ini", "w") as f: + f.write("[test]\ntest = test2") config.reload_config() - assert config.get('test', 'test') == 'test2' + assert config.get("test", "test") == "test2" + def test_write_changes(tmp_path): - config = ConfigHandler(str(tmp_path) + '/test_config.ini', logger) - with open(str(tmp_path) + '/test_config.ini', 'r') as f: - assert '[test]\ntest = test' not in f.read() - config.add_section('test') - config.set('test', 'test', 'test') + config = ConfigHandler(str(tmp_path) + "/test_config.ini", logger) + with open(str(tmp_path) + "/test_config.ini", "r") as f: + assert "[test]\ntest = test" not in f.read() + config.add_section("test") + config.set("test", "test", "test") config.write_changes() - with open(str(tmp_path) + '/test_config.ini', 'r') as f: - assert '[test]\ntest = test' in f.read() + with open(str(tmp_path) + "/test_config.ini", "r") as f: + assert "[test]\ntest = test" in f.read() + def test_does_not_overwrite_existing_config(tmp_path): - with open(str(tmp_path) + '/test_config.ini', 'w') as f: - f.write('[test]\ntest = test') - ConfigHandler(str(tmp_path) + '/test_config.ini', logger) - with open(str(tmp_path) + '/test_config.ini', 'r') as f: - assert '[test]\ntest = test' in f.read() \ No newline at end of file + with open(str(tmp_path) + "/test_config.ini", "w") as f: + f.write("[test]\ntest = test") + ConfigHandler(str(tmp_path) + "/test_config.ini", logger) + with open(str(tmp_path) + "/test_config.ini", "r") as f: + assert "[test]\ntest = test" in f.read() + +def test_safe_get(tmp_path): + with open(str(tmp_path) + "/test_config.ini", "w") as f: + f.write("[test]\nt = t") + config = ConfigHandler(str(tmp_path) + "/test_config.ini", logger) + + assert config.safe_get("test", "t") == "t" + assert config.safe_get("test", "t", "default") == "t" + +def test_safe_get_no_section(tmp_path): + with open(str(tmp_path) + "/test_config.ini", "w") as f: + f.write("[test2]\nt = t") + config = ConfigHandler(str(tmp_path) + "/test_config.ini", logger) + + assert config.safe_get("test", "t2") is None + assert config.safe_get("test", "t2", "default") == "default" + +def test_safe_get_no_option(tmp_path): + with open(str(tmp_path) + "/test_config.ini", "w") as f: + f.write("[test]\nt = t") + config = ConfigHandler(str(tmp_path) + "/test_config.ini", logger) + + assert config.safe_get("test", "t2") is None + assert config.safe_get("test", "t2", "default") == "default" diff --git a/tests/elegoo_display_test.py b/tests/elegoo_display_test.py new file mode 100644 index 0000000..3c88556 --- /dev/null +++ b/tests/elegoo_display_test.py @@ -0,0 +1,7 @@ +from src.elegoo_display import ElegooDisplayMapper + + +def test_initializing(): + mapper = ElegooDisplayMapper() + assert mapper.page_mapping is not None + assert mapper.data_mapping is not None diff --git a/tests/elegoo_neptune4_test.py b/tests/elegoo_neptune4_test.py new file mode 100644 index 0000000..36abb5f --- /dev/null +++ b/tests/elegoo_neptune4_test.py @@ -0,0 +1,86 @@ +import logging +from unittest.mock import AsyncMock, call + +import pytest +from src.elegoo_neptune4 import MODEL_N4_MAX, MODEL_N4_PLUS, MODEL_N4_PRO, MODEL_N4_REGULAR, Neptune4DisplayCommunicator, Neptune4Mapper, Neptune4MaxMapper, Neptune4PlusMapper, Neptune4ProMapper +from src.mapping import PAGE_PREPARE_TEMP, PAGE_PRINTING_FILAMENT + + +def test_n4_pro_mapping(): + mapper = Neptune4ProMapper() + assert mapper.map_page(PAGE_PREPARE_TEMP) == "6" + assert mapper.map_page(PAGE_PRINTING_FILAMENT) == "27" + + +def test_get_mapper_4(): + communicator = Neptune4DisplayCommunicator(None, MODEL_N4_REGULAR, None) + assert isinstance(communicator.mapper, Neptune4Mapper) + + +def test_get_mapper_pro(): + communicator = Neptune4DisplayCommunicator(None, MODEL_N4_PRO, None) + assert isinstance(communicator.mapper, Neptune4ProMapper) + + +def test_get_mapper_plus(): + communicator = Neptune4DisplayCommunicator(None, MODEL_N4_PLUS, None) + assert isinstance(communicator.mapper, Neptune4PlusMapper) + + +def test_get_mapper_max(): + communicator = Neptune4DisplayCommunicator(None, MODEL_N4_MAX, None) + assert isinstance(communicator.mapper, Neptune4MaxMapper) + + +def test_get_mapper_invalid(): + communicator = Neptune4DisplayCommunicator(logging, "abc", None) + assert isinstance(communicator.mapper, Neptune4Mapper) + + +@pytest.mark.asyncio +async def test_initializing(): + communicator = Neptune4DisplayCommunicator(logging, MODEL_N4_REGULAR, None) + communicator.display.command = AsyncMock() + await communicator.initialize_display() + assert communicator.mapper is not None + communicator.display.command.assert_has_calls([ + call("sendxy=1", 5), + call("p[1].q4.picc=213", 5) + ]) + + +@pytest.mark.asyncio +async def test_initializing_pro(): + communicator = Neptune4DisplayCommunicator(logging, MODEL_N4_PRO, None) + communicator.display.command = AsyncMock() + await communicator.initialize_display() + assert communicator.mapper is not None + communicator.display.command.assert_has_calls([ + call("sendxy=1", 5), + call('p[1].disp_q5.val=1', 5), + call("p[1].q4.picc=214", 5) + ]) + + +@pytest.mark.asyncio +async def test_initializing_plus(): + communicator = Neptune4DisplayCommunicator(logging, MODEL_N4_PLUS, None) + communicator.display.command = AsyncMock() + await communicator.initialize_display() + assert communicator.mapper is not None + communicator.display.command.assert_has_calls([ + call("sendxy=1", 5), + call("p[1].q4.picc=313", 5) + ]) + + +@pytest.mark.asyncio +async def test_initializing_max(): + communicator = Neptune4DisplayCommunicator(logging, MODEL_N4_MAX, None) + communicator.display.command = AsyncMock() + await communicator.initialize_display() + assert communicator.mapper is not None + communicator.display.command.assert_has_calls([ + call("sendxy=1", 5), + call("p[1].q4.picc=314", 5) + ]) \ No newline at end of file diff --git a/tests/mapping_test.py b/tests/mapping_test.py index 39caeda..47a75e0 100644 --- a/tests/mapping_test.py +++ b/tests/mapping_test.py @@ -1,11 +1,20 @@ -from src.mapping import format_temp, format_time, format_percent, build_format_filename, build_accessor +from src.mapping import ( + format_temp, + format_time, + format_percent, + build_format_filename, + build_accessor, +) + def test_format_temp(): assert format_temp(25.5) == "25.5°C" + def test_format_temp_none(): assert format_temp(None) == "N/A" + def test_format_time(): assert format_time(3660) == "01h 01m" assert format_time(60) == "01m 00s" @@ -13,17 +22,20 @@ def test_format_time(): assert format_time(3600) == "01h 00m" assert format_time(None) == "N/A" + def test_format_percent(): assert format_percent(0.5) == "50%" assert format_percent(0.123) == "12%" assert format_percent(1) == "100%" assert format_percent(None) == "N/A" + def test_format_default_filename(): format_filename = build_format_filename() assert format_filename("test.gcode") == "test" assert format_filename("test_pla_0.2mm_printer_1h.gcode") == "test_pla_0.2mm" + def test_format_printing_filename(): format_filename = build_format_filename("printing") assert format_filename("test.gcode") == "test" @@ -32,8 +44,9 @@ def test_format_printing_filename(): assert format_filename("test_pla_0.2mm_printer_1s.gcode") == "test" assert format_filename("test_pla_0.2mm_printer_1h1m1s.gcode") == "test" + def test_build_accessor(): assert build_accessor(5, 0) == "p[5].b[0]" assert build_accessor(10, "test") == "p[10].test" assert build_accessor("test", 0) == "test.b[0]" - assert build_accessor("testp", "testm") == "testp.testm" \ No newline at end of file + assert build_accessor("testp", "testm") == "testp.testm" diff --git a/tests/tjc_test.py b/tests/tjc_test.py new file mode 100644 index 0000000..9106c9d --- /dev/null +++ b/tests/tjc_test.py @@ -0,0 +1,73 @@ +from unittest.mock import MagicMock + +import pytest +from src.tjc import EventType, TJCClient, TJCNumericInputPayload, TJCProtocol, TJCTouchCoordinatePayload, TJCTouchDataPayload + + +def test_is_event(): + protocol = TJCProtocol(None) + assert protocol.is_event(b"\x65") is True + assert protocol.is_event(b"\x71") is False + assert protocol.is_event(b"\x72") is True + + +def test_data_received_touch_event(): + m = MagicMock() + protocol = TJCProtocol(m) + protocol.data_received(b"\x65\xF1\x02\xFF\xFF\xFF\xFF") + m.assert_called_once_with(b"\x65\xF1\x02") + + +def test_data_received_number(): + m = MagicMock() + protocol = TJCProtocol(m) + protocol.data_received(b"\x72\xF1\x02\x03\x05\x04\xFF\xFF\xFF") + m.assert_called_once_with(b"\x72\xF1\x02\x03\x05\x04") + + +def test_data_received_number_broken(): + m = MagicMock() + protocol = TJCProtocol(m) + protocol.data_received(b"\x71\xF1\x02\x03\x05") + m.assert_called_once_with(b"\x72\xF1\x02\x03\x05") + + +@pytest.mark.asyncio +async def test_data_received_variable(): + m = MagicMock() + protocol = TJCProtocol(m) + protocol.data_received(b"\x70\x10\x20\x30\x40\xFF\xFF\xFF") + item = await protocol.queue.get() + assert item == b"\x70\x10\x20\x30\x40" + + +def test_event_touch_coordinate(): + m = MagicMock() + client = TJCClient(None) + client._schedule_event_message_handler = m + client.event_message_handler(b"\x67\xF1\x02\x30\x20\x01") + m.assert_called_once_with(EventType.TOUCH_COORDINATE, TJCTouchCoordinatePayload(x=61698, y=12320, touch_event=1)) + + +def test_event_touch(): + m = MagicMock() + client = TJCClient(None) + client._schedule_event_message_handler = m + client.event_message_handler(b"\x65\x02\x15") + m.assert_called_once_with(EventType.TOUCH, TJCTouchDataPayload(page_id=2, component_id=21)) + + +def test_numeric_input(): + m = MagicMock() + client = TJCClient(None) + client._schedule_event_message_handler = m + client.event_message_handler(b"\x72\x03\x15\x10\x08") + m.assert_called_once_with(EventType.NUMERIC_INPUT, TJCNumericInputPayload(page_id=3, component_id=21, value=2064)) + + +def test_event_slider(): + m = MagicMock() + client = TJCClient(None) + client._schedule_event_message_handler = m + client.event_message_handler(b"\x72\x03\x16\x10\x09") + m.assert_called_once_with(EventType.NUMERIC_INPUT, TJCNumericInputPayload(page_id=3, component_id=22, value=2320))