セクションの複数ページをまとめています。 印刷またはPDF形式で保存...

もとのページに戻る

2025-08-01 現在

Python による子機との通信(グラフィカルUI編)

Python を使って子機と通信するためのグラフィカルUIを備えたアプリケーションを構築する
    実用に近い応用編として、簡単なグラフィカルインタフェースを備えたアプリケーションを作成します。

    TWELTIE DIP と通信する

    TWELITE DIP と通信するアプリケーションを、簡単なユーザインタフェースとともに構築してみましょう。

    画面の表示例

    画面の表示例

    TWELITE DIP を対象とした基本編のスクリプトでは DI1DO1 の簡単な操作に留めましたが、実際にモニタリングを行うためのアプリケーションは、TWELITE STAGE APPのようなユーザインタフェースを必要とします。

    Dear PyGui について

    ここでは、Dear PyGuiMWings ライブラリを組み合わせて使います。

    TWELITE DIP の配線と起動

    このアプリケーションでは、TWELITE DIP の DIxAIx に入力された信号を受信すると同時に、DOxPWMx へ信号を出力させます。

    VCCGND のほか、以下に示す16のピンを任意に接続してください。

    種別ピン1234範囲備考
    デジタル入力DIx#15#16#17#180.0V - VCC内部プルアップ
    アナログ入力AIx#22#23#24#250.0V - 2.0V超過で無効扱い
    デジタル出力DOx#5#8#9#120.0V - VCCLEDカソード接続
    PWM出力PWMx#4#6#7#110% - 100%電圧はVCCレベル

    スクリプトの作成と実行

    モジュールの導入

    mwings(またはmwingslite)およびdearpygui モジュールを導入します。

    
    
    pip install mwings dearpygui

    スクリプトの作成

    後述のスクリプト stick_dip_gui.py を作成してください。mwings.parsers.app_tweliteを使ってデータを受信し、mwings.serializers.app_tweliteを使ってデータを送信する機能をdearpygui経由で提供します。

    # -*- coding:utf-8 -*-
    
    from time import perf_counter
    from enum import IntEnum, auto
    from zoneinfo import ZoneInfo
    from typing import Any, Self, final
    from types import TracebackType
    from pathlib import Path
    from collections import deque
    
    import mwings as mw
    import dearpygui.dearpygui as dpg  # type: ignore
    
    
    PLOT_UPDATE_INTERVAL = 1.0 / 100  # 100Hz
    PLOT_X_LIMIT = 5.0  # 3 seconds
    
    FONT = "Mplus1Code-Medium.otf"
    
    # MARK: MainViewport
    
    
    @final
    class MainViewport:
        @final
        class State(IntEnum):
            """Application state enumeration."""
    
            IDLE = auto()
            CONNECTING = auto()
    
        selected_port: str
        twelite: mw.Twelite
    
        command_app_twelite: mw.serializers.app_twelite.Command
    
        pwm_data: dict[int, int]
        pwm_plot_data: dict[int, deque[tuple[float, int]]]
        last_plot: float
    
        state: State
        theme_disabled: int
    
        def __init__(self) -> None:
            """Initialize the main window and prepare all components."""
    
            self.initialize_viewport()
            self.create_themes()
            self.create_windows()
            self.initialize()
            self.update_state(self.State.IDLE)
    
        def create_themes(self) -> None:
            """Create a visual theme for disabled UI elements."""
    
            with dpg.theme() as self.theme_disabled:
                with dpg.theme_component(dpg.mvAll):
                    dpg.add_theme_color(dpg.mvThemeCol_Text, (150, 150, 150, 255))
                    dpg.add_theme_color(dpg.mvThemeCol_TextDisabled, (100, 100, 100, 255))
                    dpg.add_theme_color(dpg.mvThemeCol_FrameBg, (5, 5, 5, 255))
    
        # MARK: Initialize Viewport
    
        def initialize_viewport(self) -> None:
            """Set up Dear PyGui context, fonts, and viewport."""
    
            dpg.create_context()
            with dpg.font_registry():
                font_path = str(Path(__file__).parent / FONT)
                with dpg.font(file=font_path, size=18) as default_font:
                    dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
                dpg.bind_font(default_font)
            dpg.create_viewport(
                title="TWELITE STICK", width=620, height=440, resizable=True
            )
            dpg.set_viewport_vsync(False)
            dpg.setup_dearpygui()
            dpg.show_viewport()
    
        def create_windows(self) -> None:
            """Create and configure all Dear PyGui windows and their contents."""
    
            # MARK: Panel Window
            with dpg.window(
                no_title_bar=True,
                no_close=True,
                no_collapse=True,
                no_move=True,
                no_resize=True,
                pos=(10, 10),
                width=600,
                height=100,
            ):
                ports = mw.utils.get_ports()
                with dpg.group(horizontal=True):
                    with dpg.child_window(width=300):
                        dpg.add_text("シリアルポート")
                        dpg.add_combo(
                            items=ports,
                            default_value=ports[0],
                            tag="ports_combo",
                            width=280,
                            enabled=True,
                            callback=self.on_select_port,
                        )
                    with dpg.child_window(width=150):
                        dpg.add_text("シリアル通信")
                        with dpg.group(horizontal=True):
                            dpg.add_button(
                                label="接続",
                                tag="start_button",
                                enabled=True,
                                callback=self.on_start,
                            )
                            dpg.add_button(
                                label="切断",
                                tag="stop_button",
                                enabled=False,
                                callback=self.on_stop,
                            )
    
            # MARK: Input Window
            with dpg.window(
                label="TWELITE DIP 入力",
                no_close=True,
                # no_resize=True,
                pos=(10, 120),
                width=600,
                height=160,
                tag="window_twelite_dip_input",
            ):
                with dpg.group(horizontal=False):
                    # DOx
                    with dpg.group(horizontal=True):
                        for p in range(1, 5):
                            with dpg.group(horizontal=True):
                                with dpg.drawlist(
                                    width=20, height=20, tag=f"indicator_do{p}"
                                ):
                                    dpg.draw_circle(
                                        center=(10, 10),
                                        radius=10,
                                        fill=(0, 0, 0, 255),
                                        tag=f"circle_do{p}",
                                    )
                                dpg.add_text(f"DO{p}")
                    dpg.add_spacer()
                    dpg.add_separator()
                    dpg.add_spacer()
                    # PWMx
                    with dpg.group(horizontal=True):
                        for p in range(1, 5):
                            with dpg.plot(
                                label=f"PWM{p}",
                                height=80,
                                width=140,
                                tag=f"plot_pwm{p}",
                                no_frame=True,
                                no_mouse_pos=True,
                            ):
                                dpg.add_plot_axis(
                                    dpg.mvXAxis,
                                    tag=f"plot_pwm{p}_x",
                                    no_tick_labels=True,
                                )
                                with dpg.plot_axis(
                                    dpg.mvYAxis,
                                    tag=f"plot_pwm{p}_y",
                                    no_tick_labels=True,
                                    no_tick_marks=True,
                                    no_gridlines=True,
                                ):
                                    dpg.add_line_series(
                                        [],
                                        [],
                                        tag=f"series_pwm{p}",
                                        parent=f"plot_pwm{p}_y",
                                    )
    
            # MARK: Output Window
            with dpg.window(
                label="TWELITE DIP 出力",
                no_close=True,
                no_resize=True,
                pos=(10, 290),
                width=600,
                height=140,
                tag="window_twelite_dip_output",
            ):
                with dpg.group(horizontal=False):
                    # DIx
                    with dpg.group(horizontal=True):
                        for p in range(1, 5):
                            dpg.add_checkbox(
                                label=f"DI{p}",
                                tag=f"checkbox_di{p}",
                                enabled=False,
                                callback=self.on_check_di,
                                user_data=p,
                            )
                    dpg.add_spacer()
                    dpg.add_separator()
                    dpg.add_spacer()
                    # AIx
                    with dpg.group(horizontal=False):
                        with dpg.group(horizontal=True):
                            for p in range(1, 5):
                                with dpg.group(horizontal=False):
                                    dpg.add_text(f"AI{p}")
                                    dpg.add_slider_int(
                                        label="",
                                        width=80,
                                        default_value=0,
                                        min_value=0,
                                        max_value=2000,
                                        tag=f"slider_ai{p}",
                                        enabled=False,
                                        callback=self.on_change_ai,
                                        user_data=p,
                                    )
    
        def update_state(self, new_state: State) -> None:
            """Update the UI and internal state based on the application's current status."""
    
            # MARK: UI State
            self.state = new_state
            match new_state:
                case self.State.IDLE:
                    dpg.configure_item("ports_combo", enabled=True)
                    dpg.configure_item("start_button", enabled=True)
                    dpg.configure_item("stop_button", enabled=False)
                    for p in range(1, 5):
                        dpg.configure_item(f"checkbox_di{p}", enabled=False)
                        dpg.configure_item(f"slider_ai{p}", enabled=False)
                    dpg.bind_item_theme("window_twelite_dip_input", self.theme_disabled)
                    dpg.bind_item_theme("window_twelite_dip_output", self.theme_disabled)
                case self.State.CONNECTING:
                    dpg.configure_item("ports_combo", enabled=False)
                    dpg.configure_item("start_button", enabled=False)
                    dpg.configure_item("stop_button", enabled=True)
                    for p in range(1, 5):
                        dpg.configure_item(f"checkbox_di{p}", enabled=True)
                        dpg.configure_item(f"slider_ai{p}", enabled=False)
                    dpg.bind_item_theme("window_twelite_dip_input", 0)
                    dpg.bind_item_theme("window_twelite_dip_output", 0)
    
        # MARK: UI Handlers
    
        def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
            """Handle serial port selection from the combo box."""
    
            self.selected_port = app_data
    
        def on_start(self, sender: Any, app_data: str, user_data: Any) -> None:
            """Handle the start button click and initiate communication."""
    
            self.start()
    
        def on_stop(self, sender: Any, app_data: str, user_data: Any) -> None:
            """Handle the stop button click and terminate communication."""
    
            self.update_state(self.State.IDLE)
            self.twelite.close()
    
        def on_check_di(self, sender: Any, app_data: bool, user_data: int) -> None:
            """Handle checkbox state changes for DI output control."""
    
            if 1 <= user_data <= 4:
                self.command_app_twelite.di_state[user_data - 1] = app_data
                self.twelite.send(self.command_app_twelite)
    
        def on_change_ai(self, sender: Any, app_data: int, user_data: int) -> None:
            """Handle slider changes for AI values and send PWM updates."""
    
            if 1 <= user_data <= 4:
                self.command_app_twelite.pwm_duty[user_data - 1] = app_data * 1024 // 2000
                self.twelite.send(self.command_app_twelite)
    
        # MARK: Packet handler
    
        def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
            """Update the GUI based on incoming TWELITE DIP packet data."""
    
            now = perf_counter()
            for p in range(1, 5):
                # DO
                p_state = packet.di_state[p - 1]
                match p:
                    case 1:  # RED
                        color = (255, 0, 0, 255) if p_state else (0, 0, 0, 255)
                    case 2:  # GREEN
                        color = (0, 255, 0, 255) if p_state else (0, 0, 0, 255)
                    case 3:  # YELLOW
                        color = (255, 255, 0, 255) if p_state else (0, 0, 0, 255)
                    case 4:  # BLUE
                        color = (0, 0, 255, 255) if p_state else (0, 0, 0, 255)
                dpg.configure_item(f"circle_do{p}", fill=color)
                # PWM
                self.pwm_data[p] = packet.ai_voltage[p - 1]
                self.pwm_plot_data[p].append((now, self.pwm_data[p]))
    
        # MARK: Plot
    
        def update_plot(self) -> None:
            """Update the plot with the latest PWM data."""
    
            now = perf_counter()
            # Throttle plot updates to PLOT_UPDATE_INTERVAL
            if now - self.last_plot < PLOT_UPDATE_INTERVAL:
                return
            self.last_plot = now
            for p in range(1, 5):
                # If no new value was added recently, duplicate the last value
                if self.pwm_plot_data[p]:
                    last_time, last_val = self.pwm_plot_data[p][-1]
                    if now - last_time > PLOT_UPDATE_INTERVAL:
                        self.pwm_plot_data[p].append((now, last_val))
                else:
                    self.pwm_plot_data[p].append((now, self.pwm_data[p]))
                # Remove old values older than PLOT_X_LIMIT
                while (
                    self.pwm_plot_data[p]
                    and now - self.pwm_plot_data[p][0][0] > PLOT_X_LIMIT
                ):
                    self.pwm_plot_data[p].popleft()
                x_series, y_series = (
                    zip(*list(self.pwm_plot_data[p])) if self.pwm_plot_data[p] else ([], [])
                )
                if x_series and y_series:
                    dpg.set_value(f"series_pwm{p}", [x_series, y_series])
                    dpg.set_axis_limits(f"plot_pwm{p}_x", x_series[0], x_series[-1])
                    dpg.set_axis_limits(f"plot_pwm{p}_y", 0, 2000)
    
        # MARK: Initialize Variables
    
        def initialize(self) -> None:
            """Initialize internal command structure and default values."""
    
            ports = mw.utils.get_ports()
            self.selected_port = ports[0]  # default
    
            command_app_twelite_initial: dict[str, Any] = {
                "destination_logical_id": 0x78,  # All child devices
                "di_to_change": [True, True, True, True],  # Enable DI1-4
                "di_state": [False, False, False, False],  # Initial state of DIx
                "pwm_to_change": [True, True, True, True],  # Enable AI1-4
                "pwm_duty": [0, 0, 0, 0],  # Initial state of AIx
            }
            self.command_app_twelite = mw.serializers.app_twelite.Command(
                **command_app_twelite_initial
            )
    
            self.pwm_data = {1: 0, 2: 0, 3: 0, 4: 0}
            self.pwm_plot_data = {1: deque(), 2: deque(), 3: deque(), 4: deque()}
            self.last_plot = 0.0
    
        # MARK: Start
    
        def start(self) -> None:
            """Start TWELITE communication and register listeners."""
    
            # Create a twelite object
            self.twelite = mw.Twelite(self.selected_port)
            # Use JST for received data
            self.twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
            # Register event handler(s)
            self.twelite.add_listener(mw.common.PacketType.APP_TWELITE, self.on_app_twelite)
    
            self.update_state(self.State.CONNECTING)
    
        # MARK: loop
    
        def loop(self) -> None:
            """Perform periodic updates while the application is running."""
    
            match self.state:
                case self.State.IDLE:
                    pass
                case self.State.CONNECTING:
                    self.twelite.update()
                    self.update_plot()
            dpg.render_dearpygui_frame()
    
        # MARK: Show
    
        def show(self) -> None:
            """Main execution loop of the application."""
    
            try:
                while dpg.is_dearpygui_running():
                    self.loop()
            except KeyboardInterrupt:
                pass
            finally:
                print("Quit...")
                self.close()
    
        # MARK: Lifecycle
    
        def __enter__(self) -> Self:
            """Enter the runtime context related to this object."""
    
            return self
    
        def __exit__(
            self,
            exc_type: type[BaseException] | None,
            exc_value: BaseException | None,
            traceback: TracebackType | None,
        ) -> bool | None:
            """Exit the runtime context and close the connection."""
    
            self.close()
            return None
    
        def __del__(self) -> None:
            """Destructor to ensure resources are cleaned up."""
    
            self.close()
    
        def close(self) -> None:
            """Cleanly shut down TWELITE connection and Dear PyGui context."""
    
            if self.state == self.State.CONNECTING:
                self.twelite.close()
            dpg.destroy_context()
    
    
    # MARK: Entry point
    
    if __name__ == "__main__":
        with MainViewport() as viewport:
            viewport.show()

    スクリプトの実行

    スクリプトを実行します。

    
    
    python stick_dip_gui.py
    1. 「シリアルポート」のコンボボックスから、使用するポート(COM3, /dev/tty.usb*など)を選択
    2. 「シリアル通信」の「接続」ボタンを押して通信を開始
    3. TWELITE DIP の入力を変化させて、「TWELITE DIP 入力」ウィンドウの変化を確認
    4. 「TWELITE DIP 出力」ウィンドウのチェックボックスやスライダーを変化させて、TWELITE DIP の出力の変化を確認
    5. 「シリアル通信」の「切断」ボタンを押して通信を終了
    6. ウィンドウを閉じる
    表示例

    表示例

    正常に通信できていれば、TWELITE DIP の状態をリアルタイムに表示しつつ、TWELITE DIP の出力を任意のタイミングで変更できるはずです。

    スクリプトの解説

    Dear PyGui の初期化は initialize_viewport() が行っています。

    フォントファイルや、OSのウィンドウ(Viewportと呼ぶ)の大きさはここで登録しています。

        def initialize_viewport(self) -> None:
            """Set up Dear PyGui context, fonts, and viewport."""
    
            dpg.create_context()
            with dpg.font_registry():
                font_path = str(Path(__file__).parent / FONT)
                with dpg.font(file=font_path, size=18) as default_font:
                    dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
                dpg.bind_font(default_font)
            dpg.create_viewport(
                title="TWELITE STICK", width=620, height=440, resizable=True
            )
            dpg.set_viewport_vsync(False)
            dpg.setup_dearpygui()
            dpg.show_viewport()

    OSのウィンドウ内のインタフェースは、create_windows() に定義しています。

    画面内の子ウィンドウは dpg.window()によって定義され、配下のコンポーネントを定義していきます。

    コンポーネントには tag を登録することで、値や属性をプログラムから変更できます(HTMLのid=のようなイメージ)。

        def create_windows(self) -> None:
            """Create and configure all Dear PyGui windows and their contents."""
    
            # MARK: Panel Window
            with dpg.window(
                no_title_bar=True,
                no_close=True,
                no_collapse=True,
                no_move=True,
                no_resize=True,
                pos=(10, 10),
                width=600,
                height=100,
            ):
                ports = mw.utils.get_ports()
                with dpg.group(horizontal=True):
                    with dpg.child_window(width=300):
                        dpg.add_text("シリアルポート")
                        dpg.add_combo(
                            items=ports,
                            default_value=ports[0],
                            tag="ports_combo",
                            width=280,
                            enabled=True,
                            callback=self.on_select_port,
                        )
                    ...

    また、入力を受け付けるボタンやコンボボックス、スライダーやチェックボックスでは、callback を登録できます。

        def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
            """Handle serial port selection from the combo box."""
    
            self.selected_port = app_data

    ほかのスクリプトと同様に、TWELITE の子機から受信したデータは、イベントハンドラで受け取ることができます。

    on_app_twelite() では、DOxに反映するデータを tag によって即座に登録し、PWMxに反映するデータはメンバ変数を通じて update_plot() へ渡しています。

        def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
            """Update the GUI based on incoming TWELITE DIP packet data."""
    
            now = perf_counter()
            for p in range(1, 5):
                # DO
                p_state = packet.di_state[p - 1]
                match p:
                    case 1:  # RED
                        color = (255, 0, 0, 255) if p_state else (0, 0, 0, 255)
                    case 2:  # GREEN
                        color = (0, 255, 0, 255) if p_state else (0, 0, 0, 255)
                    case 3:  # YELLOW
                        color = (255, 255, 0, 255) if p_state else (0, 0, 0, 255)
                    case 4:  # BLUE
                        color = (0, 0, 255, 255) if p_state else (0, 0, 0, 255)
                dpg.configure_item(f"circle_do{p}", fill=color)
                # PWM
                self.pwm_data[p] = packet.ai_voltage[p - 1]
                self.pwm_plot_data[p].append((now, self.pwm_data[p]))

    「開始」ボタンを押したときは、start() が呼び出されます。

    ここで mwings.Twelite を初期化し、受信ハンドラを登録しています。

        def start(self) -> None:
            """Start TWELITE communication and register listeners."""
    
            # Create a twelite object
            self.twelite = mw.Twelite(self.selected_port)
            # Use JST for received data
            self.twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
            # Register event handler(s)
            self.twelite.add_listener(mw.common.PacketType.APP_TWELITE, self.on_app_twelite)
    
            self.update_state(self.State.CONNECTING)

    Dear PyGui は低レベルな API を使って構築されているため、各フレームの描画をアプリケーションから指示し、描画ループを自前で定義することができます。OS のウィンドウが表示されている間、loop()は無限に呼び出されます。

    シリアルポートへ接続したのち、loop() が行う処理は3つあります。

        def loop(self) -> None:
            """Perform periodic updates while the application is running."""
    
            match self.state:
                case self.State.IDLE:
                    pass
                case self.State.CONNECTING:
                    self.twelite.update()
                    self.update_plot()
            dpg.render_dearpygui_frame()

    画面全体の処理は、show() を呼び出すことで開始されます。

        def show(self) -> None:
            """Main execution loop of the application."""
    
            try:
                while dpg.is_dearpygui_running():
                    self.loop()
            except KeyboardInterrupt:
                pass
            finally:
                print("Quit...")
                self.close()

    プログラムは、画面を閉じる dpg.is_dearpygui_running()== False か、Ctrl+Cを入力する KeyboardInterrupt ことで描画ループを抜け、mwings.Twelitedearpygui のクローズ処理を経て終了します。