Python による子機との通信(グラフィカルUI編)
TWELTIE DIP と通信する
TWELITE DIP と通信するアプリケーションを、簡単なユーザインタフェースとともに構築してみましょう。

画面の表示例
TWELITE DIP を対象とした基本編のスクリプトでは DI1
と DO1
の簡単な操作に留めましたが、実際にモニタリングを行うためのアプリケーションは、TWELITE STAGE APPのようなユーザインタフェースを必要とします。
Dear PyGui について
ここでは、Dear PyGui と MWings ライブラリを組み合わせて使います。
Dear PyGui (DPG) は、C++ で書かれた Dear ImGui と OpenGL をベースにしたUIツールキットです。
開発ツールのUIに利用されることが多く、高度なカスタマイズを苦手とする反面、軽量な動作とシンプルな記述を特徴としています。
TWELITE DIP の配線と起動
このアプリケーションでは、TWELITE DIP の DIx
や AIx
に入力された信号を受信すると同時に、DOx
や PWMx
へ信号を出力させます。
VCC
と GND
のほか、以下に示す16のピンを任意に接続してください。
種別 | ピン | 1 | 2 | 3 | 4 | 範囲 | 備考 |
---|---|---|---|---|---|---|---|
デジタル入力 | DIx | #15 | #16 | #17 | #18 | 0.0V - VCC | 内部プルアップ |
アナログ入力 | AIx | #22 | #23 | #24 | #25 | 0.0V - 2.0V | 超過で無効扱い |
デジタル出力 | DOx | #5 | #8 | #9 | #12 | 0.0V - VCC | LEDカソード接続 |
PWM出力 | PWMx | #4 | #6 | #7 | #11 | 0% - 100% | 電圧はVCC レベル |
スクリプトの作成と実行
モジュールの導入
mwings
(またはmwingslite
)およびdearpygui
モジュールを導入します。
pip install mwings dearpygui
スクリプトの作成
後述のスクリプト stick_dip_gui.py
を作成してください。mwings.parsers.app_twelite
を使ってデータを受信し、mwings.serializers.app_twelite
を使ってデータを送信する機能をdearpygui
経由で提供します。
日本語を表示するためには、フォントファイルが必要です。
ここでは、M PLUS FONTS の Mplus1Code-Medium.otf
が同じ階層にあることを前提としています。
Project
├ stick_dip_gui.py
└ Mplus1Code-Medium.otf
# -*- coding:utf-8 -*-
from time import perf_counter
from enum import IntEnum, auto
from zoneinfo import ZoneInfo
from typing import Any, Self, final
from types import TracebackType
from pathlib import Path
from collections import deque
import mwings as mw
import dearpygui.dearpygui as dpg # type: ignore
PLOT_UPDATE_INTERVAL = 1.0 / 100 # 100Hz
PLOT_X_LIMIT = 5.0 # 3 seconds
FONT = "Mplus1Code-Medium.otf"
# MARK: MainViewport
@final
class MainViewport:
@final
class State(IntEnum):
"""Application state enumeration."""
IDLE = auto()
CONNECTING = auto()
selected_port: str
twelite: mw.Twelite
command_app_twelite: mw.serializers.app_twelite.Command
pwm_data: dict[int, int]
pwm_plot_data: dict[int, deque[tuple[float, int]]]
last_plot: float
state: State
theme_disabled: int
def __init__(self) -> None:
"""Initialize the main window and prepare all components."""
self.initialize_viewport()
self.create_themes()
self.create_windows()
self.initialize()
self.update_state(self.State.IDLE)
def create_themes(self) -> None:
"""Create a visual theme for disabled UI elements."""
with dpg.theme() as self.theme_disabled:
with dpg.theme_component(dpg.mvAll):
dpg.add_theme_color(dpg.mvThemeCol_Text, (150, 150, 150, 255))
dpg.add_theme_color(dpg.mvThemeCol_TextDisabled, (100, 100, 100, 255))
dpg.add_theme_color(dpg.mvThemeCol_FrameBg, (5, 5, 5, 255))
# MARK: Initialize Viewport
def initialize_viewport(self) -> None:
"""Set up Dear PyGui context, fonts, and viewport."""
dpg.create_context()
with dpg.font_registry():
font_path = str(Path(__file__).parent / FONT)
with dpg.font(file=font_path, size=18) as default_font:
dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
dpg.bind_font(default_font)
dpg.create_viewport(
title="TWELITE STICK", width=620, height=440, resizable=True
)
dpg.set_viewport_vsync(False)
dpg.setup_dearpygui()
dpg.show_viewport()
def create_windows(self) -> None:
"""Create and configure all Dear PyGui windows and their contents."""
# MARK: Panel Window
with dpg.window(
no_title_bar=True,
no_close=True,
no_collapse=True,
no_move=True,
no_resize=True,
pos=(10, 10),
width=600,
height=100,
):
ports = mw.utils.get_ports()
with dpg.group(horizontal=True):
with dpg.child_window(width=300):
dpg.add_text("シリアルポート")
dpg.add_combo(
items=ports,
default_value=ports[0],
tag="ports_combo",
width=280,
enabled=True,
callback=self.on_select_port,
)
with dpg.child_window(width=150):
dpg.add_text("シリアル通信")
with dpg.group(horizontal=True):
dpg.add_button(
label="接続",
tag="start_button",
enabled=True,
callback=self.on_start,
)
dpg.add_button(
label="切断",
tag="stop_button",
enabled=False,
callback=self.on_stop,
)
# MARK: Input Window
with dpg.window(
label="TWELITE DIP 入力",
no_close=True,
# no_resize=True,
pos=(10, 120),
width=600,
height=160,
tag="window_twelite_dip_input",
):
with dpg.group(horizontal=False):
# DOx
with dpg.group(horizontal=True):
for p in range(1, 5):
with dpg.group(horizontal=True):
with dpg.drawlist(
width=20, height=20, tag=f"indicator_do{p}"
):
dpg.draw_circle(
center=(10, 10),
radius=10,
fill=(0, 0, 0, 255),
tag=f"circle_do{p}",
)
dpg.add_text(f"DO{p}")
dpg.add_spacer()
dpg.add_separator()
dpg.add_spacer()
# PWMx
with dpg.group(horizontal=True):
for p in range(1, 5):
with dpg.plot(
label=f"PWM{p}",
height=80,
width=140,
tag=f"plot_pwm{p}",
no_frame=True,
no_mouse_pos=True,
):
dpg.add_plot_axis(
dpg.mvXAxis,
tag=f"plot_pwm{p}_x",
no_tick_labels=True,
)
with dpg.plot_axis(
dpg.mvYAxis,
tag=f"plot_pwm{p}_y",
no_tick_labels=True,
no_tick_marks=True,
no_gridlines=True,
):
dpg.add_line_series(
[],
[],
tag=f"series_pwm{p}",
parent=f"plot_pwm{p}_y",
)
# MARK: Output Window
with dpg.window(
label="TWELITE DIP 出力",
no_close=True,
no_resize=True,
pos=(10, 290),
width=600,
height=140,
tag="window_twelite_dip_output",
):
with dpg.group(horizontal=False):
# DIx
with dpg.group(horizontal=True):
for p in range(1, 5):
dpg.add_checkbox(
label=f"DI{p}",
tag=f"checkbox_di{p}",
enabled=False,
callback=self.on_check_di,
user_data=p,
)
dpg.add_spacer()
dpg.add_separator()
dpg.add_spacer()
# AIx
with dpg.group(horizontal=False):
with dpg.group(horizontal=True):
for p in range(1, 5):
with dpg.group(horizontal=False):
dpg.add_text(f"AI{p}")
dpg.add_slider_int(
label="",
width=80,
default_value=0,
min_value=0,
max_value=2000,
tag=f"slider_ai{p}",
enabled=False,
callback=self.on_change_ai,
user_data=p,
)
def update_state(self, new_state: State) -> None:
"""Update the UI and internal state based on the application's current status."""
# MARK: UI State
self.state = new_state
match new_state:
case self.State.IDLE:
dpg.configure_item("ports_combo", enabled=True)
dpg.configure_item("start_button", enabled=True)
dpg.configure_item("stop_button", enabled=False)
for p in range(1, 5):
dpg.configure_item(f"checkbox_di{p}", enabled=False)
dpg.configure_item(f"slider_ai{p}", enabled=False)
dpg.bind_item_theme("window_twelite_dip_input", self.theme_disabled)
dpg.bind_item_theme("window_twelite_dip_output", self.theme_disabled)
case self.State.CONNECTING:
dpg.configure_item("ports_combo", enabled=False)
dpg.configure_item("start_button", enabled=False)
dpg.configure_item("stop_button", enabled=True)
for p in range(1, 5):
dpg.configure_item(f"checkbox_di{p}", enabled=True)
dpg.configure_item(f"slider_ai{p}", enabled=False)
dpg.bind_item_theme("window_twelite_dip_input", 0)
dpg.bind_item_theme("window_twelite_dip_output", 0)
# MARK: UI Handlers
def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
"""Handle serial port selection from the combo box."""
self.selected_port = app_data
def on_start(self, sender: Any, app_data: str, user_data: Any) -> None:
"""Handle the start button click and initiate communication."""
self.start()
def on_stop(self, sender: Any, app_data: str, user_data: Any) -> None:
"""Handle the stop button click and terminate communication."""
self.update_state(self.State.IDLE)
self.twelite.close()
def on_check_di(self, sender: Any, app_data: bool, user_data: int) -> None:
"""Handle checkbox state changes for DI output control."""
if 1 <= user_data <= 4:
self.command_app_twelite.di_state[user_data - 1] = app_data
self.twelite.send(self.command_app_twelite)
def on_change_ai(self, sender: Any, app_data: int, user_data: int) -> None:
"""Handle slider changes for AI values and send PWM updates."""
if 1 <= user_data <= 4:
self.command_app_twelite.pwm_duty[user_data - 1] = app_data * 1024 // 2000
self.twelite.send(self.command_app_twelite)
# MARK: Packet handler
def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
"""Update the GUI based on incoming TWELITE DIP packet data."""
now = perf_counter()
for p in range(1, 5):
# DO
p_state = packet.di_state[p - 1]
match p:
case 1: # RED
color = (255, 0, 0, 255) if p_state else (0, 0, 0, 255)
case 2: # GREEN
color = (0, 255, 0, 255) if p_state else (0, 0, 0, 255)
case 3: # YELLOW
color = (255, 255, 0, 255) if p_state else (0, 0, 0, 255)
case 4: # BLUE
color = (0, 0, 255, 255) if p_state else (0, 0, 0, 255)
dpg.configure_item(f"circle_do{p}", fill=color)
# PWM
self.pwm_data[p] = packet.ai_voltage[p - 1]
self.pwm_plot_data[p].append((now, self.pwm_data[p]))
# MARK: Plot
def update_plot(self) -> None:
"""Update the plot with the latest PWM data."""
now = perf_counter()
# Throttle plot updates to PLOT_UPDATE_INTERVAL
if now - self.last_plot < PLOT_UPDATE_INTERVAL:
return
self.last_plot = now
for p in range(1, 5):
# If no new value was added recently, duplicate the last value
if self.pwm_plot_data[p]:
last_time, last_val = self.pwm_plot_data[p][-1]
if now - last_time > PLOT_UPDATE_INTERVAL:
self.pwm_plot_data[p].append((now, last_val))
else:
self.pwm_plot_data[p].append((now, self.pwm_data[p]))
# Remove old values older than PLOT_X_LIMIT
while (
self.pwm_plot_data[p]
and now - self.pwm_plot_data[p][0][0] > PLOT_X_LIMIT
):
self.pwm_plot_data[p].popleft()
x_series, y_series = (
zip(*list(self.pwm_plot_data[p])) if self.pwm_plot_data[p] else ([], [])
)
if x_series and y_series:
dpg.set_value(f"series_pwm{p}", [x_series, y_series])
dpg.set_axis_limits(f"plot_pwm{p}_x", x_series[0], x_series[-1])
dpg.set_axis_limits(f"plot_pwm{p}_y", 0, 2000)
# MARK: Initialize Variables
def initialize(self) -> None:
"""Initialize internal command structure and default values."""
ports = mw.utils.get_ports()
self.selected_port = ports[0] # default
command_app_twelite_initial: dict[str, Any] = {
"destination_logical_id": 0x78, # All child devices
"di_to_change": [True, True, True, True], # Enable DI1-4
"di_state": [False, False, False, False], # Initial state of DIx
"pwm_to_change": [True, True, True, True], # Enable AI1-4
"pwm_duty": [0, 0, 0, 0], # Initial state of AIx
}
self.command_app_twelite = mw.serializers.app_twelite.Command(
**command_app_twelite_initial
)
self.pwm_data = {1: 0, 2: 0, 3: 0, 4: 0}
self.pwm_plot_data = {1: deque(), 2: deque(), 3: deque(), 4: deque()}
self.last_plot = 0.0
# MARK: Start
def start(self) -> None:
"""Start TWELITE communication and register listeners."""
# Create a twelite object
self.twelite = mw.Twelite(self.selected_port)
# Use JST for received data
self.twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
# Register event handler(s)
self.twelite.add_listener(mw.common.PacketType.APP_TWELITE, self.on_app_twelite)
self.update_state(self.State.CONNECTING)
# MARK: loop
def loop(self) -> None:
"""Perform periodic updates while the application is running."""
match self.state:
case self.State.IDLE:
pass
case self.State.CONNECTING:
self.twelite.update()
self.update_plot()
dpg.render_dearpygui_frame()
# MARK: Show
def show(self) -> None:
"""Main execution loop of the application."""
try:
while dpg.is_dearpygui_running():
self.loop()
except KeyboardInterrupt:
pass
finally:
print("Quit...")
self.close()
# MARK: Lifecycle
def __enter__(self) -> Self:
"""Enter the runtime context related to this object."""
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> bool | None:
"""Exit the runtime context and close the connection."""
self.close()
return None
def __del__(self) -> None:
"""Destructor to ensure resources are cleaned up."""
self.close()
def close(self) -> None:
"""Cleanly shut down TWELITE connection and Dear PyGui context."""
if self.state == self.State.CONNECTING:
self.twelite.close()
dpg.destroy_context()
# MARK: Entry point
if __name__ == "__main__":
with MainViewport() as viewport:
viewport.show()
スクリプトの実行
スクリプトを実行します。
python stick_dip_gui.py
- 「シリアルポート」のコンボボックスから、使用するポート(
COM3
,/dev/tty.usb*
など)を選択 - 「シリアル通信」の「接続」ボタンを押して通信を開始
- TWELITE DIP の入力を変化させて、「TWELITE DIP 入力」ウィンドウの変化を確認
- 「TWELITE DIP 出力」ウィンドウのチェックボックスやスライダーを変化させて、TWELITE DIP の出力の変化を確認
- 「シリアル通信」の「切断」ボタンを押して通信を終了
- ウィンドウを閉じる

表示例
正常に通信できていれば、TWELITE DIP の状態をリアルタイムに表示しつつ、TWELITE DIP の出力を任意のタイミングで変更できるはずです。
グラフの更新タイミング
PWMx
のリアルタイム描画グラフは、過去5秒間の状態を 10ms の間隔で反映します。これはスクリプト冒頭の変数が決めています。
PLOT_UPDATE_INTERVAL = 1.0 / 100 # 100Hz
PLOT_X_LIMIT = 5.0 # 3 seconds
スクリプトの解説
Dear PyGui の初期化は initialize_viewport()
が行っています。
フォントファイルや、OSのウィンドウ(Viewportと呼ぶ)の大きさはここで登録しています。
def initialize_viewport(self) -> None:
"""Set up Dear PyGui context, fonts, and viewport."""
dpg.create_context()
with dpg.font_registry():
font_path = str(Path(__file__).parent / FONT)
with dpg.font(file=font_path, size=18) as default_font:
dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
dpg.bind_font(default_font)
dpg.create_viewport(
title="TWELITE STICK", width=620, height=440, resizable=True
)
dpg.set_viewport_vsync(False)
dpg.setup_dearpygui()
dpg.show_viewport()
OSのウィンドウ内のインタフェースは、create_windows()
に定義しています。
画面内の子ウィンドウは dpg.window()
によって定義され、配下のコンポーネントを定義していきます。
コンポーネントには tag
を登録することで、値や属性をプログラムから変更できます(HTMLのid=
のようなイメージ)。
def create_windows(self) -> None:
"""Create and configure all Dear PyGui windows and their contents."""
# MARK: Panel Window
with dpg.window(
no_title_bar=True,
no_close=True,
no_collapse=True,
no_move=True,
no_resize=True,
pos=(10, 10),
width=600,
height=100,
):
ports = mw.utils.get_ports()
with dpg.group(horizontal=True):
with dpg.child_window(width=300):
dpg.add_text("シリアルポート")
dpg.add_combo(
items=ports,
default_value=ports[0],
tag="ports_combo",
width=280,
enabled=True,
callback=self.on_select_port,
)
...
また、入力を受け付けるボタンやコンボボックス、スライダーやチェックボックスでは、callback
を登録できます。
def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
"""Handle serial port selection from the combo box."""
self.selected_port = app_data
ほかのスクリプトと同様に、TWELITE の子機から受信したデータは、イベントハンドラで受け取ることができます。
Nuitkaで実行ファイルをビルドする場合
Nuitkaを使って.exe
などの実行ファイルをビルドする場合、メソッドのバインディングをC言語に変換する際にself
が重複し、引数の数を超過してしまう場合があります。このような場合には、クロージャを返すように実装してください。
from typing import Any, Callable
...
def cb_for_select_port(self) -> Callable[[Any, str, Any], None]:
"""Make a callback for serial port selection from the combo box."""
def callback(sender: Any, app_data: str, user_data: Any) -> None:
self.selected_port = app_data
return callback
...
dpg.add_combo(
...
callback=self.cb_for_select_port(),
)
on_app_twelite()
では、DOx
に反映するデータを tag
によって即座に登録し、PWMx
に反映するデータはメンバ変数を通じて update_plot()
へ渡しています。
def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
"""Update the GUI based on incoming TWELITE DIP packet data."""
now = perf_counter()
for p in range(1, 5):
# DO
p_state = packet.di_state[p - 1]
match p:
case 1: # RED
color = (255, 0, 0, 255) if p_state else (0, 0, 0, 255)
case 2: # GREEN
color = (0, 255, 0, 255) if p_state else (0, 0, 0, 255)
case 3: # YELLOW
color = (255, 255, 0, 255) if p_state else (0, 0, 0, 255)
case 4: # BLUE
color = (0, 0, 255, 255) if p_state else (0, 0, 0, 255)
dpg.configure_item(f"circle_do{p}", fill=color)
# PWM
self.pwm_data[p] = packet.ai_voltage[p - 1]
self.pwm_plot_data[p].append((now, self.pwm_data[p]))
「開始」ボタンを押したときは、start()
が呼び出されます。
ここで mwings.Twelite
を初期化し、受信ハンドラを登録しています。
@on
ではなく、add_listener
メソッドを使っています。
def start(self) -> None:
"""Start TWELITE communication and register listeners."""
# Create a twelite object
self.twelite = mw.Twelite(self.selected_port)
# Use JST for received data
self.twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
# Register event handler(s)
self.twelite.add_listener(mw.common.PacketType.APP_TWELITE, self.on_app_twelite)
self.update_state(self.State.CONNECTING)
Dear PyGui は低レベルな API を使って構築されているため、各フレームの描画をアプリケーションから指示し、描画ループを自前で定義することができます。OS のウィンドウが表示されている間、loop()
は無限に呼び出されます。
シリアルポートへ接続したのち、loop()
が行う処理は3つあります。
mwings.Twelite.update()
による受信処理update_plot()
によるPWMx
のグラフの更新処理dpg.render_dearpygui_frame()
によるUIの描画および更新処理
mwings.Twelite.start()
は使わず、手動で更新処理を行うための mwings.Twelite.update()
を利用しています。
def loop(self) -> None:
"""Perform periodic updates while the application is running."""
match self.state:
case self.State.IDLE:
pass
case self.State.CONNECTING:
self.twelite.update()
self.update_plot()
dpg.render_dearpygui_frame()
画面全体の処理は、show()
を呼び出すことで開始されます。
def show(self) -> None:
"""Main execution loop of the application."""
try:
while dpg.is_dearpygui_running():
self.loop()
except KeyboardInterrupt:
pass
finally:
print("Quit...")
self.close()
プログラムは、画面を閉じる dpg.is_dearpygui_running()
== False
か、Ctrl+C
を入力する KeyboardInterrupt
ことで描画ループを抜け、mwings.Twelite
や dearpygui
のクローズ処理を経て終了します。