/      English

Python による子機との通信(グラフィカルUI編)

Python を使って子機と通信するためのグラフィカルUIを備えたアプリケーションを構築する
実用に近い応用編として、簡単なグラフィカルインタフェースを備えたアプリケーションを作成します。

TWELTIE DIP と通信する

TWELITE DIP と通信するアプリケーションを、簡単なユーザインタフェースとともに構築してみましょう。

画面の表示例

画面の表示例

TWELITE DIP を対象とした基本編のスクリプトでは DI1DO1 の簡単な操作に留めましたが、実際にモニタリングを行うためのアプリケーションは、TWELITE STAGE APPのようなユーザインタフェースを必要とします。

Dear PyGui について

ここでは、Dear PyGuiMWings ライブラリを組み合わせて使います。

TWELITE DIP の配線と起動

このアプリケーションでは、TWELITE DIP の DIxAIx に入力された信号を受信すると同時に、DOxPWMx へ信号を出力させます。

VCCGND のほか、以下に示す16のピンを任意に接続してください。

種別ピン1234範囲備考
デジタル入力DIx#15#16#17#180.0V - VCC内部プルアップ
アナログ入力AIx#22#23#24#250.0V - 2.0V超過で無効扱い
デジタル出力DOx#5#8#9#120.0V - VCCLEDカソード接続
PWM出力PWMx#4#6#7#110% - 100%電圧はVCCレベル

スクリプトの作成と実行

モジュールの導入

mwings(またはmwingslite)およびdearpygui モジュールを導入します。


pip install mwings dearpygui

スクリプトの作成

後述のスクリプト stick_dip_gui.py を作成してください。mwings.parsers.app_tweliteを使ってデータを受信し、mwings.serializers.app_tweliteを使ってデータを送信する機能をdearpygui経由で提供します。

# -*- coding:utf-8 -*-

from time import perf_counter
from enum import IntEnum, auto
from zoneinfo import ZoneInfo
from typing import Any, Self, final
from types import TracebackType
from pathlib import Path
from collections import deque

import mwings as mw
import dearpygui.dearpygui as dpg  # type: ignore


PLOT_UPDATE_INTERVAL = 1.0 / 100  # 100Hz
PLOT_X_LIMIT = 5.0  # 3 seconds

FONT = "Mplus1Code-Medium.otf"

# MARK: MainViewport


@final
class MainViewport:
    @final
    class State(IntEnum):
        """Application state enumeration."""

        IDLE = auto()
        CONNECTING = auto()

    selected_port: str
    twelite: mw.Twelite

    command_app_twelite: mw.serializers.app_twelite.Command

    pwm_data: dict[int, int]
    pwm_plot_data: dict[int, deque[tuple[float, int]]]
    last_plot: float

    state: State
    theme_disabled: int

    def __init__(self) -> None:
        """Initialize the main window and prepare all components."""

        self.initialize_viewport()
        self.create_themes()
        self.create_windows()
        self.initialize()
        self.update_state(self.State.IDLE)

    def create_themes(self) -> None:
        """Create a visual theme for disabled UI elements."""

        with dpg.theme() as self.theme_disabled:
            with dpg.theme_component(dpg.mvAll):
                dpg.add_theme_color(dpg.mvThemeCol_Text, (150, 150, 150, 255))
                dpg.add_theme_color(dpg.mvThemeCol_TextDisabled, (100, 100, 100, 255))
                dpg.add_theme_color(dpg.mvThemeCol_FrameBg, (5, 5, 5, 255))

    # MARK: Initialize Viewport

    def initialize_viewport(self) -> None:
        """Set up Dear PyGui context, fonts, and viewport."""

        dpg.create_context()
        with dpg.font_registry():
            font_path = str(Path(__file__).parent / FONT)
            with dpg.font(file=font_path, size=18) as default_font:
                dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
            dpg.bind_font(default_font)
        dpg.create_viewport(
            title="TWELITE STICK", width=620, height=440, resizable=True
        )
        dpg.set_viewport_vsync(False)
        dpg.setup_dearpygui()
        dpg.show_viewport()

    def create_windows(self) -> None:
        """Create and configure all Dear PyGui windows and their contents."""

        # MARK: Panel Window
        with dpg.window(
            no_title_bar=True,
            no_close=True,
            no_collapse=True,
            no_move=True,
            no_resize=True,
            pos=(10, 10),
            width=600,
            height=100,
        ):
            ports = mw.utils.get_ports()
            with dpg.group(horizontal=True):
                with dpg.child_window(width=300):
                    dpg.add_text("シリアルポート")
                    dpg.add_combo(
                        items=ports,
                        default_value=ports[0],
                        tag="ports_combo",
                        width=280,
                        enabled=True,
                        callback=self.on_select_port,
                    )
                with dpg.child_window(width=150):
                    dpg.add_text("シリアル通信")
                    with dpg.group(horizontal=True):
                        dpg.add_button(
                            label="接続",
                            tag="start_button",
                            enabled=True,
                            callback=self.on_start,
                        )
                        dpg.add_button(
                            label="切断",
                            tag="stop_button",
                            enabled=False,
                            callback=self.on_stop,
                        )

        # MARK: Input Window
        with dpg.window(
            label="TWELITE DIP 入力",
            no_close=True,
            # no_resize=True,
            pos=(10, 120),
            width=600,
            height=160,
            tag="window_twelite_dip_input",
        ):
            with dpg.group(horizontal=False):
                # DOx
                with dpg.group(horizontal=True):
                    for p in range(1, 5):
                        with dpg.group(horizontal=True):
                            with dpg.drawlist(
                                width=20, height=20, tag=f"indicator_do{p}"
                            ):
                                dpg.draw_circle(
                                    center=(10, 10),
                                    radius=10,
                                    fill=(0, 0, 0, 255),
                                    tag=f"circle_do{p}",
                                )
                            dpg.add_text(f"DO{p}")
                dpg.add_spacer()
                dpg.add_separator()
                dpg.add_spacer()
                # PWMx
                with dpg.group(horizontal=True):
                    for p in range(1, 5):
                        with dpg.plot(
                            label=f"PWM{p}",
                            height=80,
                            width=140,
                            tag=f"plot_pwm{p}",
                            no_frame=True,
                            no_mouse_pos=True,
                        ):
                            dpg.add_plot_axis(
                                dpg.mvXAxis,
                                tag=f"plot_pwm{p}_x",
                                no_tick_labels=True,
                            )
                            with dpg.plot_axis(
                                dpg.mvYAxis,
                                tag=f"plot_pwm{p}_y",
                                no_tick_labels=True,
                                no_tick_marks=True,
                                no_gridlines=True,
                            ):
                                dpg.add_line_series(
                                    [],
                                    [],
                                    tag=f"series_pwm{p}",
                                    parent=f"plot_pwm{p}_y",
                                )

        # MARK: Output Window
        with dpg.window(
            label="TWELITE DIP 出力",
            no_close=True,
            no_resize=True,
            pos=(10, 290),
            width=600,
            height=140,
            tag="window_twelite_dip_output",
        ):
            with dpg.group(horizontal=False):
                # DIx
                with dpg.group(horizontal=True):
                    for p in range(1, 5):
                        dpg.add_checkbox(
                            label=f"DI{p}",
                            tag=f"checkbox_di{p}",
                            enabled=False,
                            callback=self.on_check_di,
                            user_data=p,
                        )
                dpg.add_spacer()
                dpg.add_separator()
                dpg.add_spacer()
                # AIx
                with dpg.group(horizontal=False):
                    with dpg.group(horizontal=True):
                        for p in range(1, 5):
                            with dpg.group(horizontal=False):
                                dpg.add_text(f"AI{p}")
                                dpg.add_slider_int(
                                    label="",
                                    width=80,
                                    default_value=0,
                                    min_value=0,
                                    max_value=2000,
                                    tag=f"slider_ai{p}",
                                    enabled=False,
                                    callback=self.on_change_ai,
                                    user_data=p,
                                )

    def update_state(self, new_state: State) -> None:
        """Update the UI and internal state based on the application's current status."""

        # MARK: UI State
        self.state = new_state
        match new_state:
            case self.State.IDLE:
                dpg.configure_item("ports_combo", enabled=True)
                dpg.configure_item("start_button", enabled=True)
                dpg.configure_item("stop_button", enabled=False)
                for p in range(1, 5):
                    dpg.configure_item(f"checkbox_di{p}", enabled=False)
                    dpg.configure_item(f"slider_ai{p}", enabled=False)
                dpg.bind_item_theme("window_twelite_dip_input", self.theme_disabled)
                dpg.bind_item_theme("window_twelite_dip_output", self.theme_disabled)
            case self.State.CONNECTING:
                dpg.configure_item("ports_combo", enabled=False)
                dpg.configure_item("start_button", enabled=False)
                dpg.configure_item("stop_button", enabled=True)
                for p in range(1, 5):
                    dpg.configure_item(f"checkbox_di{p}", enabled=True)
                    dpg.configure_item(f"slider_ai{p}", enabled=False)
                dpg.bind_item_theme("window_twelite_dip_input", 0)
                dpg.bind_item_theme("window_twelite_dip_output", 0)

    # MARK: UI Handlers

    def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle serial port selection from the combo box."""

        self.selected_port = app_data

    def on_start(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle the start button click and initiate communication."""

        self.start()

    def on_stop(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle the stop button click and terminate communication."""

        self.update_state(self.State.IDLE)
        self.twelite.close()

    def on_check_di(self, sender: Any, app_data: bool, user_data: int) -> None:
        """Handle checkbox state changes for DI output control."""

        if 1 <= user_data <= 4:
            self.command_app_twelite.di_state[user_data - 1] = app_data
            self.twelite.send(self.command_app_twelite)

    def on_change_ai(self, sender: Any, app_data: int, user_data: int) -> None:
        """Handle slider changes for AI values and send PWM updates."""

        if 1 <= user_data <= 4:
            self.command_app_twelite.pwm_duty[user_data - 1] = app_data * 1024 // 2000
            self.twelite.send(self.command_app_twelite)

    # MARK: Packet handler

    def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        """Update the GUI based on incoming TWELITE DIP packet data."""

        now = perf_counter()
        for p in range(1, 5):
            # DO
            p_state = packet.di_state[p - 1]
            match p:
                case 1:  # RED
                    color = (255, 0, 0, 255) if p_state else (0, 0, 0, 255)
                case 2:  # GREEN
                    color = (0, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 3:  # YELLOW
                    color = (255, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 4:  # BLUE
                    color = (0, 0, 255, 255) if p_state else (0, 0, 0, 255)
            dpg.configure_item(f"circle_do{p}", fill=color)
            # PWM
            self.pwm_data[p] = packet.ai_voltage[p - 1]
            self.pwm_plot_data[p].append((now, self.pwm_data[p]))

    # MARK: Plot

    def update_plot(self) -> None:
        """Update the plot with the latest PWM data."""

        now = perf_counter()
        # Throttle plot updates to PLOT_UPDATE_INTERVAL
        if now - self.last_plot < PLOT_UPDATE_INTERVAL:
            return
        self.last_plot = now
        for p in range(1, 5):
            # If no new value was added recently, duplicate the last value
            if self.pwm_plot_data[p]:
                last_time, last_val = self.pwm_plot_data[p][-1]
                if now - last_time > PLOT_UPDATE_INTERVAL:
                    self.pwm_plot_data[p].append((now, last_val))
            else:
                self.pwm_plot_data[p].append((now, self.pwm_data[p]))
            # Remove old values older than PLOT_X_LIMIT
            while (
                self.pwm_plot_data[p]
                and now - self.pwm_plot_data[p][0][0] > PLOT_X_LIMIT
            ):
                self.pwm_plot_data[p].popleft()
            x_series, y_series = (
                zip(*list(self.pwm_plot_data[p])) if self.pwm_plot_data[p] else ([], [])
            )
            if x_series and y_series:
                dpg.set_value(f"series_pwm{p}", [x_series, y_series])
                dpg.set_axis_limits(f"plot_pwm{p}_x", x_series[0], x_series[-1])
                dpg.set_axis_limits(f"plot_pwm{p}_y", 0, 2000)

    # MARK: Initialize Variables

    def initialize(self) -> None:
        """Initialize internal command structure and default values."""

        ports = mw.utils.get_ports()
        self.selected_port = ports[0]  # default

        command_app_twelite_initial: dict[str, Any] = {
            "destination_logical_id": 0x78,  # All child devices
            "di_to_change": [True, True, True, True],  # Enable DI1-4
            "di_state": [False, False, False, False],  # Initial state of DIx
            "pwm_to_change": [True, True, True, True],  # Enable AI1-4
            "pwm_duty": [0, 0, 0, 0],  # Initial state of AIx
        }
        self.command_app_twelite = mw.serializers.app_twelite.Command(
            **command_app_twelite_initial
        )

        self.pwm_data = {1: 0, 2: 0, 3: 0, 4: 0}
        self.pwm_plot_data = {1: deque(), 2: deque(), 3: deque(), 4: deque()}
        self.last_plot = 0.0

    # MARK: Start

    def start(self) -> None:
        """Start TWELITE communication and register listeners."""

        # Create a twelite object
        self.twelite = mw.Twelite(self.selected_port)
        # Use JST for received data
        self.twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
        # Register event handler(s)
        self.twelite.add_listener(mw.common.PacketType.APP_TWELITE, self.on_app_twelite)

        self.update_state(self.State.CONNECTING)

    # MARK: loop

    def loop(self) -> None:
        """Perform periodic updates while the application is running."""

        match self.state:
            case self.State.IDLE:
                pass
            case self.State.CONNECTING:
                self.twelite.update()
                self.update_plot()
        dpg.render_dearpygui_frame()

    # MARK: Show

    def show(self) -> None:
        """Main execution loop of the application."""

        try:
            while dpg.is_dearpygui_running():
                self.loop()
        except KeyboardInterrupt:
            pass
        finally:
            print("Quit...")
            self.close()

    # MARK: Lifecycle

    def __enter__(self) -> Self:
        """Enter the runtime context related to this object."""

        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> bool | None:
        """Exit the runtime context and close the connection."""

        self.close()
        return None

    def __del__(self) -> None:
        """Destructor to ensure resources are cleaned up."""

        self.close()

    def close(self) -> None:
        """Cleanly shut down TWELITE connection and Dear PyGui context."""

        if self.state == self.State.CONNECTING:
            self.twelite.close()
        dpg.destroy_context()


# MARK: Entry point

if __name__ == "__main__":
    with MainViewport() as viewport:
        viewport.show()

スクリプトの実行

スクリプトを実行します。


python stick_dip_gui.py
  1. 「シリアルポート」のコンボボックスから、使用するポート(COM3, /dev/tty.usb*など)を選択
  2. 「シリアル通信」の「接続」ボタンを押して通信を開始
  3. TWELITE DIP の入力を変化させて、「TWELITE DIP 入力」ウィンドウの変化を確認
  4. 「TWELITE DIP 出力」ウィンドウのチェックボックスやスライダーを変化させて、TWELITE DIP の出力の変化を確認
  5. 「シリアル通信」の「切断」ボタンを押して通信を終了
  6. ウィンドウを閉じる
表示例

表示例

正常に通信できていれば、TWELITE DIP の状態をリアルタイムに表示しつつ、TWELITE DIP の出力を任意のタイミングで変更できるはずです。

スクリプトの解説

Dear PyGui の初期化は initialize_viewport() が行っています。

フォントファイルや、OSのウィンドウ(Viewportと呼ぶ)の大きさはここで登録しています。

    def initialize_viewport(self) -> None:
        """Set up Dear PyGui context, fonts, and viewport."""

        dpg.create_context()
        with dpg.font_registry():
            font_path = str(Path(__file__).parent / FONT)
            with dpg.font(file=font_path, size=18) as default_font:
                dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
            dpg.bind_font(default_font)
        dpg.create_viewport(
            title="TWELITE STICK", width=620, height=440, resizable=True
        )
        dpg.set_viewport_vsync(False)
        dpg.setup_dearpygui()
        dpg.show_viewport()

OSのウィンドウ内のインタフェースは、create_windows() に定義しています。

画面内の子ウィンドウは dpg.window()によって定義され、配下のコンポーネントを定義していきます。

コンポーネントには tag を登録することで、値や属性をプログラムから変更できます(HTMLのid=のようなイメージ)。

    def create_windows(self) -> None:
        """Create and configure all Dear PyGui windows and their contents."""

        # MARK: Panel Window
        with dpg.window(
            no_title_bar=True,
            no_close=True,
            no_collapse=True,
            no_move=True,
            no_resize=True,
            pos=(10, 10),
            width=600,
            height=100,
        ):
            ports = mw.utils.get_ports()
            with dpg.group(horizontal=True):
                with dpg.child_window(width=300):
                    dpg.add_text("シリアルポート")
                    dpg.add_combo(
                        items=ports,
                        default_value=ports[0],
                        tag="ports_combo",
                        width=280,
                        enabled=True,
                        callback=self.on_select_port,
                    )
                ...

また、入力を受け付けるボタンやコンボボックス、スライダーやチェックボックスでは、callback を登録できます。

    def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle serial port selection from the combo box."""

        self.selected_port = app_data

ほかのスクリプトと同様に、TWELITE の子機から受信したデータは、イベントハンドラで受け取ることができます。

on_app_twelite() では、DOxに反映するデータを tag によって即座に登録し、PWMxに反映するデータはメンバ変数を通じて update_plot() へ渡しています。

    def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        """Update the GUI based on incoming TWELITE DIP packet data."""

        now = perf_counter()
        for p in range(1, 5):
            # DO
            p_state = packet.di_state[p - 1]
            match p:
                case 1:  # RED
                    color = (255, 0, 0, 255) if p_state else (0, 0, 0, 255)
                case 2:  # GREEN
                    color = (0, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 3:  # YELLOW
                    color = (255, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 4:  # BLUE
                    color = (0, 0, 255, 255) if p_state else (0, 0, 0, 255)
            dpg.configure_item(f"circle_do{p}", fill=color)
            # PWM
            self.pwm_data[p] = packet.ai_voltage[p - 1]
            self.pwm_plot_data[p].append((now, self.pwm_data[p]))

「開始」ボタンを押したときは、start() が呼び出されます。

ここで mwings.Twelite を初期化し、受信ハンドラを登録しています。

    def start(self) -> None:
        """Start TWELITE communication and register listeners."""

        # Create a twelite object
        self.twelite = mw.Twelite(self.selected_port)
        # Use JST for received data
        self.twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
        # Register event handler(s)
        self.twelite.add_listener(mw.common.PacketType.APP_TWELITE, self.on_app_twelite)

        self.update_state(self.State.CONNECTING)

Dear PyGui は低レベルな API を使って構築されているため、各フレームの描画をアプリケーションから指示し、描画ループを自前で定義することができます。OS のウィンドウが表示されている間、loop()は無限に呼び出されます。

シリアルポートへ接続したのち、loop() が行う処理は3つあります。

    def loop(self) -> None:
        """Perform periodic updates while the application is running."""

        match self.state:
            case self.State.IDLE:
                pass
            case self.State.CONNECTING:
                self.twelite.update()
                self.update_plot()
        dpg.render_dearpygui_frame()

画面全体の処理は、show() を呼び出すことで開始されます。

    def show(self) -> None:
        """Main execution loop of the application."""

        try:
            while dpg.is_dearpygui_running():
                self.loop()
        except KeyboardInterrupt:
            pass
        finally:
            print("Quit...")
            self.close()

プログラムは、画面を閉じる dpg.is_dearpygui_running()== False か、Ctrl+Cを入力する KeyboardInterrupt ことで描画ループを抜け、mwings.Twelitedearpygui のクローズ処理を経て終了します。