This is the multi-page printable view of this section. Click here to print...

Return to the regular view of this page

As of 2025-07-24

Communicating with Child Devices Using Python (Basic)

How to communicate with child devices using Python
By using a dedicated library, you can communicate with TWELITE child devices via TWELITE STICK from Python.

Basic Operation Check

Connect the TWELITE STICK

Connect the TWELITE STICK to your PC’s USB port.

The factory-default TWELITE is configured in Parent mode of the Parent and Repeater App. It can send and receive data to and from child devices in the TWELITE series, and the logo should light up magenta.

Factory default state

Factory default state

Installing the MWings Library

Prepare a Python 3.12 or later environment.

Install the MWings module, which interprets the output of the Parent and Repeater App.


pip install mwings

The MWings module allows communication with TWELITE child devices via a TWELITE parent device connected to the host.

  • Interprets data received from the parent and converts it into a dictionary, JSON, or pandas DataFrame format*
  • Sends commands generated from dictionaries to the parent device

*The Lite version does not support pandas

Confirming Data Reception

Verify that TWELITE STICK can interpret the received data.

Please run the sample script rx_print_json.py available on GitHub.

This script will print the contents of all types of packets received by the TWELITE STICK in JSON format to the terminal, allowing you to confirm the data.

# -*- coding:utf-8 -*-
# Written for Python 3.12
# Formatted with Black

# MWings example: Receive data, print JSON, typed

from zoneinfo import ZoneInfo

import mwings as mw


# Main function
def main() -> None:
    # Create a twelite object
    twelite = mw.Twelite(mw.utils.ask_user_for_port())

    # Use JST for received data
    twelite.set_timezone(ZoneInfo("Asia/Tokyo"))

    # Register event handlers
    @twelite.on(mw.common.PacketType.APP_ARIA)
    def on_app_aria(packet: mw.parsers.app_aria.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_CUE)
    def on_app_cue(packet: mw.parsers.app_cue.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_CUE_PAL_EVENT)
    def on_app_cue_pal_event(packet: mw.parsers.app_cue_pal_event.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_IO)
    def on_app_io(packet: mw.parsers.app_io.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_TWELITE)
    def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_PAL_AMB)
    def on_app_pal_amb(packet: mw.parsers.app_pal_amb.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_PAL_MOT)
    def on_app_pal_mot(packet: mw.parsers.app_pal_mot.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_PAL_OPENCLOSE)
    def on_app_pal_openclose(packet: mw.parsers.app_pal_openclose.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_UART_ASCII)
    def on_app_uart_ascii(packet: mw.parsers.app_uart_ascii.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_UART_ASCII_EXTENDED)
    def on_app_uart_ascii_extended(
        packet: mw.parsers.app_uart_ascii_extended.ParsedPacket,
    ) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.ACT)
    def on_act(packet: mw.parsers.act.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    # Start receiving
    try:
        # Set as daemon thread
        twelite.daemon = True
        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            twelite.join(0.5)
    except KeyboardInterrupt:
        # Stop the thread
        print("Flushing...")
        twelite.stop()
        print("Completed")


if __name__ == "__main__":
    # Call the main function
    main()

For example, if you have prepared a factory-default TWELITE DIP running the Extremely Simple! Standard App as a child device, just like in the previous section Using TWELITE STAGE APP, you should see an output like the following. To stop the script, press Ctrl+C.

{
  "time_parsed": "2025-07-11T11:45:02.067804+09:00",
  "packet_type": "APP_TWELITE",
  "sequence_number": 4713,
  "source_serial_id": "8201007F",
  "source_logical_id": 120,
  "lqi": 166,
  "supply_voltage": 3254,
  "destination_logical_id": 0,
  "relay_count": 0,
  "periodic": true,
  "di_changed_1": false,
  "di_changed_2": false,
  "di_changed_3": false,
  "di_changed_4": false,
  "di_state_1": false,
  "di_state_2": false,
  "di_state_3": false,
  "di_state_4": false,
  "ai_voltage_1": 2000,
  "ai_voltage_2": 2000,
  "ai_voltage_3": 2000,
  "ai_voltage_4": 2000
}

python rx_export_csv_durable.py -h
usage: rx_export_csv_durable.py [-h] [-v] [-s]

Log packets from App_Wings to csv, line by line

options:
  -h, --help     show this help message and exit
  -v, --verbose  include system information
  -s, --sort     sort columns in the output

Sending and Receiving Arbitrary Data

Here, let’s create a script that specifically communicates with a TWELITE DIP child device prepared just like in Using TWELITE STAGE APP.

Example wiring of TWELITE DIP child device

Example wiring of TWELITE DIP child device

The goal is to display the state of a switch connected to the DI1 pin of the TWELITE DIP and control an LED connected to the DO1 pin.

Receiving Only Standard App Data

Modify rx_print_json.py to create a simplified script that only receives data from the Extremely Simple! Standard App.

# -*- coding:utf-8 -*-

from zoneinfo import ZoneInfo

import mwings as mw


# Main function
def main() -> None:
    # Create a twelite object
    twelite = mw.Twelite(mw.utils.ask_user_for_port())

    # Use JST for received data
    twelite.set_timezone(ZoneInfo("Asia/Tokyo"))

    # Register event handlers
    @twelite.on(mw.common.PacketType.APP_TWELITE)
    def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    # Start receiving
    try:
        # Set as daemon thread
        twelite.daemon = True
        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            twelite.join(0.5)
    except KeyboardInterrupt:
        # Stop the thread
        print("Flushing...")
        twelite.stop()
        print("Completed")


if __name__ == "__main__":
    # Call the main function
    main()

Extracting Specific Values

In the previous script, we used the to_json() method to output all data as a JSON string.

Here, we’ll extract only the state of the DI1 pin from parsers.app_twelite.ParsedPacket using the di_state field and display a virtual LED in the terminal.

Rewrite the on_app_twelite() handler as follows:

def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        print(f"\rDO1 LED: {"🔴" if packet.di_state[0] else "⚪"}", end='', flush=True)

If pressing the switch connected to the DI pin of TWELITE DIP causes the red light to appear, it indicates success.

Example display

Example display

Sending Commands to the Parent Device

In addition to displaying data received by the TWELITE STICK, you can also send data from the TWELITE STICK.

In the script’s main loop, we call threading.Thread.join() every 0.5 seconds so that the receiving thread can also terminate when the main thread ends.

        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            twelite.join(0.5)

Using this mechanism, let’s control the DO1 pin of the TWELITE DIP from the main loop and try blinking the LED every 0.5 seconds.

Modify the previous script as follows. The full script below includes all changes made so far.

# -*- coding:utf-8 -*-

from zoneinfo import ZoneInfo
from typing import Any

import mwings as mw


# Main function
def main() -> None:
    # Create a twelite object
    twelite = mw.Twelite(mw.utils.ask_user_for_port())

    # Use JST for received data
    twelite.set_timezone(ZoneInfo("Asia/Tokyo"))

    # Register event handlers
    @twelite.on(mw.common.PacketType.APP_TWELITE)
    def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        print(f"\rDO1 LED: {"🔴" if packet.di_state[0] else "⚪"}", end='', flush=True)

    # Initialize command
    initial: dict[str, Any] = {
        "destination_logical_id": 0x78,  # All child devices
        "di_to_change": [True, False, False, False],  # Enable DI1
        "di_state": [False, False, False, False],  # Initial state of DIx
    }
    command = mw.serializers.app_twelite.Command(**initial)

    # Toggle the DI1 state
    def toggle_di1() -> None:
        command.di_state[0] = not command.di_state[0]
        twelite.send(command)

    # Start receiving
    try:
        # Set as daemon thread
        twelite.daemon = True
        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            toggle_di1() # Send
            twelite.join(0.5) # Receive
    except KeyboardInterrupt:
        # Stop the thread
        print("Flushing...")
        twelite.stop()
        print("Completed")


if __name__ == "__main__":
    # Call the main function
    main()

When you run this script, it displays a virtual LED just like before.

Example display

Example display

At the same time, the LED connected to the DO1 pin of TWELITE DIP should continuously blink. When you stop the script, the blinking stops.

1 - Communicating with Child Devices Using Python (Web Server IoT)

Send data from child devices to a web server using Python
As a practical application, this guide demonstrates how to build an IoT system using a web server.

Collecting Temperature and Humidity Data from TWELITE ARIA

Let’s build a simple IoT system that receives temperature and humidity data from the sensor tag TWELITE ARIA, sends it to a web server, and displays it on a graph.

ThingSpeakの表示例

ThingSpeakの表示例

In the basic script for TWELITE DIP, we only performed simple operations on DI1 and DO1. However, in an actual IoT system, it is necessary to send acquired data to an upstream server using methods such as REST APIs.

About ThingSpeak

Here, we will use ThingSpeak, a service by MathWorks, in combination with the MWings library.

Creating an Account

Visit the ThingSpeak website and create a MathWorks account.

Creating a Channel

Create a “Channel” and configure it as shown below (the “Name” and “Description” can be arbitrary).

Channel Configuration Example

Channel Configuration Example

Obtaining API Keys

Go to the “API Keys” tab on the Channel page and note down the 16-character “Write API key”.

Configuring and Starting TWELITE ARIA

Changing Settings for TWELITE ARIA

Change the settings of TWELITE ARIA and set the Transmission Interval of TWELITE ARIA Mode to 20 seconds or more (to avoid overloading the server).

Starting TWELITE ARIA

Insert a CR2032 battery to power up TWELITE ARIA.

Insert CR2032 battery

Insert CR2032 battery

Writing and Running the Script

Installing Required Modules

Prepare Python 3.12 or later and install the mwings (or mwingslite) and requests modules.


pip install mwings requests

Creating the Script

Create the script stick_aria_thingspeak.py as shown below. Use mwings.parsers.app_aria to receive data and the requests module to send HTTP GET requests to ThingSpeak.

# -*- coding:utf-8 -*-

from zoneinfo import ZoneInfo
from time import perf_counter

import mwings as mw
import requests


API_KEY = "XXXXXXXXXXXXXXXX"  # Replace with your ThingSpeak API key
BASE_URL = "https://api.thingspeak.com/update"
SEND_MIN_INTERVAL = 20  # Minimum interval in seconds to send data to ThingSpeak


# Main function
def main() -> None:
    # Create a twelite object
    twelite = mw.Twelite(mw.utils.ask_user_for_port())

    # Use JST for received data
    twelite.set_timezone(ZoneInfo("Asia/Tokyo"))

    # Initialize last send time
    last_send_time = perf_counter() - SEND_MIN_INTERVAL

    # Initialize the target serial ID
    target_serial_id = -1

    # Register event handlers
    @twelite.on(mw.common.PacketType.APP_ARIA)
    def on_app_aria(packet: mw.parsers.app_aria.ParsedPacket) -> None:

        # Filter packets by serial ID
        if target_serial_id < 0:
            # Set the serial ID from the received packet
            target_serial_id = packet.source_serial_id
            print(f"Serial ID set to {target_serial_id:08X}")
        elif packet.source_serial_id != target_serial_id:
            # Ignore packets from other serial ID devices
            print(
                f"Ignoring packet from serial ID {packet.source_serial_id:08X}, expected {target_serial_id:08X}"
            )
            return

        # Throttle sending to ThingSpeak
        if perf_counter() - last_send_time < SEND_MIN_INTERVAL:
            print("Skipping send due to minimum interval")
            return  # Skip sending if within the minimum interval
        last_send_time = perf_counter()  # Update last send time

        # Send data to ThingSpeak
        payload = {
            "api_key": API_KEY,
            "field1": f"{packet.temp_100x / 100.0:.2f}",  # Temperature
            "field2": f"{packet.humid_100x / 100.0:.2f}",  # Humidity
            "field3": f"{packet.supply_voltage}",  # Supply voltage (mV)
            "field4": f"{packet.lqi}",  # Link Quality Indicator
        }
        response = requests.get(BASE_URL, params=payload)

        # Check the response status
        if response.status_code == 200:
            print(f"OK: entry ID = {response.text}")
        else:
            print(f"NG: status code = {response.status_code}")

    # Start receiving
    try:
        # Set as daemon thread
        twelite.daemon = True
        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            twelite.join(0.5)  # Receive
    except KeyboardInterrupt:
        # Stop the thread
        print("Flushing...")
        twelite.stop()
        print("Completed")


if __name__ == "__main__":
    # Call the main function
    main()

Running the Script

Execute the script.


python stick_aria_thingspeak.py

If data is successfully received from TWELITE ARIA and sent correctly, you will see the following output showing entry IDs in sequence:

Started receiving
Serial ID set to 8201C2DC
OK: entry ID = 1
OK: entry ID = 2
OK: entry ID = 3
OK: entry ID = 4
OK: entry ID = 5
...

Click the “Private View” tab on ThingSpeak. The transmitted data should appear as a graph.

By default, the Y-axis range adjusts dynamically. You can configure the min and max values for each graph via the ✏️ icon.

Displaying approximately 2 days of data

Displaying approximately 2 days of data

Script Overview

The data sent to the server is constructed in the following section:

# Send data to ThingSpeak
payload = {
    "api_key": API_KEY,
    "field1": f"{packet.temp_100x / 100.0:.2f}",  # Temperature
    "field2": f"{packet.humid_100x / 100.0:.2f}",  # Humidity
    "field3": f"{packet.supply_voltage}",  # Supply voltage (mV)
    "field4": f"{packet.lqi}",  # Link Quality Indicator
}
response = requests.get(BASE_URL, params=payload)

From mwings.parsers.app_aria, we retrieve the temperature, humidity, coin cell voltage, and signal quality (LQI) expressed as a value between 0 and 255, then convert these to strings to construct the query for the GET request.

2 - Communicating with Child Devices Using Python (Graphical UI)

Build an application with a graphical UI to communicate with child devices using Python
As a practical example, we will create an application with a simple graphical user interface.

Communicating with TWELITE DIP

Let’s build an application to communicate with the TWELITE DIP, featuring a simple user interface.

Example of screen display

Example of screen display

In the basic script for TWELITE DIP, we only performed simple operations with DI1 and DO1. However, applications designed for actual monitoring require a user interface like the one found in TWELITE STAGE APP.

About Dear PyGui

Here, we will use a combination of Dear PyGui and the MWings library.

Wiring and Starting the TWELITE DIP

In this application, when a signal is received via DIx or AIx of the TWELITE DIP, a signal is simultaneously output to DOx or PWMx.

In addition to VCC and GND, freely connect any of the following 16 pins:

TypePin1234RangeNotes
Digital InputDIx#15#16#17#180.0V - VCCInternal pull-up
Analog InputAIx#22#23#24#250.0V - 2.0VInvalid if exceeded
Digital OutputDOx#5#8#9#120.0V - VCCConnect LED cathode
PWM OutputPWMx#4#6#7#110% - 100%Voltage at VCC level

Creating and Running the Script

Installing Modules

Install the mwings (or mwingslite) and dearpygui modules.


pip install mwings dearpygui

Creating the Script

Create the script stick_dip_gui.py as described below. It uses mwings.parsers.app_twelite to receive data and mwings.serializers.app_twelite to send data, with a GUI provided via dearpygui.

# -*- coding:utf-8 -*-

from time import perf_counter
from enum import IntEnum, auto
from zoneinfo import ZoneInfo
from typing import Any, Self, final
from types import TracebackType
from pathlib import Path
from collections import deque

import mwings as mw
import dearpygui.dearpygui as dpg  # type: ignore


PLOT_UPDATE_INTERVAL = 1.0 / 100  # 100Hz
PLOT_X_LIMIT = 5.0  # 3 seconds

FONT = "Mplus1Code-Medium.otf"

# MARK: MainViewport


@final
class MainViewport:
    @final
    class State(IntEnum):
        """Application state enumeration."""

        IDLE = auto()
        CONNECTING = auto()

    selected_port: str
    twelite: mw.Twelite

    command_app_twelite: mw.serializers.app_twelite.Command

    pwm_data: dict[int, int]
    pwm_plot_data: dict[int, deque[tuple[float, int]]]
    last_plot: float

    state: State
    theme_disabled: int

    def __init__(self) -> None:
        """Initialize the main window and prepare all components."""

        self.initialize_viewport()
        self.create_themes()
        self.create_windows()
        self.initialize()
        self.update_state(self.State.IDLE)

    def create_themes(self) -> None:
        """Create a visual theme for disabled UI elements."""

        with dpg.theme() as self.theme_disabled:
            with dpg.theme_component(dpg.mvAll):
                dpg.add_theme_color(dpg.mvThemeCol_Text, (150, 150, 150, 255))
                dpg.add_theme_color(dpg.mvThemeCol_TextDisabled, (100, 100, 100, 255))
                dpg.add_theme_color(dpg.mvThemeCol_FrameBg, (5, 5, 5, 255))

    # MARK: Initialize Viewport

    def initialize_viewport(self) -> None:
        """Set up Dear PyGui context, fonts, and viewport."""

        dpg.create_context()
        with dpg.font_registry():
            font_path = str(Path(__file__).parent / FONT)
            with dpg.font(file=font_path, size=18) as default_font:
                dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
            dpg.bind_font(default_font)
        dpg.create_viewport(
            title="TWELITE STICK", width=620, height=440, resizable=True
        )
        dpg.set_viewport_vsync(False)
        dpg.setup_dearpygui()
        dpg.show_viewport()

    def create_windows(self) -> None:
        """Create and configure all Dear PyGui windows and their contents."""

        # MARK: Panel Window
        with dpg.window(
            no_title_bar=True,
            no_close=True,
            no_collapse=True,
            no_move=True,
            no_resize=True,
            pos=(10, 10),
            width=600,
            height=100,
        ):
            ports = mw.utils.get_ports()
            with dpg.group(horizontal=True):
                with dpg.child_window(width=300):
                    dpg.add_text("Serial Port")
                    dpg.add_combo(
                        items=ports,
                        default_value=ports[0],
                        tag="ports_combo",
                        width=280,
                        enabled=True,
                        callback=self.on_select_port,
                    )
                with dpg.child_window(width=150):
                    dpg.add_text("Transmission")
                    with dpg.group(horizontal=True):
                        dpg.add_button(
                            label="Connect",
                            tag="start_button",
                            enabled=True,
                            callback=self.on_start,
                        )
                        dpg.add_button(
                            label="Disconnect",
                            tag="stop_button",
                            enabled=False,
                            callback=self.on_stop,
                        )

        # MARK: Input Window
        with dpg.window(
            label="TWELITE DIP Input",
            no_close=True,
            # no_resize=True,
            pos=(10, 120),
            width=600,
            height=160,
            tag="window_twelite_dip_input",
        ):
            with dpg.group(horizontal=False):
                # DOx
                with dpg.group(horizontal=True):
                    for p in range(1, 5):
                        with dpg.group(horizontal=True):
                            with dpg.drawlist(
                                width=20, height=20, tag=f"indicator_do{p}"
                            ):
                                dpg.draw_circle(
                                    center=(10, 10),
                                    radius=10,
                                    fill=(0, 0, 0, 255),
                                    tag=f"circle_do{p}",
                                )
                            dpg.add_text(f"DO{p}")
                dpg.add_spacer()
                dpg.add_separator()
                dpg.add_spacer()
                # PWMx
                with dpg.group(horizontal=True):
                    for p in range(1, 5):
                        with dpg.plot(
                            label=f"PWM{p}",
                            height=80,
                            width=140,
                            tag=f"plot_pwm{p}",
                            no_frame=True,
                            no_mouse_pos=True,
                        ):
                            dpg.add_plot_axis(
                                dpg.mvXAxis,
                                tag=f"plot_pwm{p}_x",
                                no_tick_labels=True,
                            )
                            with dpg.plot_axis(
                                dpg.mvYAxis,
                                tag=f"plot_pwm{p}_y",
                                no_tick_labels=True,
                                no_tick_marks=True,
                                no_gridlines=True,
                            ):
                                dpg.add_line_series(
                                    [],
                                    [],
                                    tag=f"series_pwm{p}",
                                    parent=f"plot_pwm{p}_y",
                                )

        # MARK: Output Window
        with dpg.window(
            label="TWELITE DIP Output",
            no_close=True,
            no_resize=True,
            pos=(10, 290),
            width=600,
            height=140,
            tag="window_twelite_dip_output",
        ):
            with dpg.group(horizontal=False):
                # DIx
                with dpg.group(horizontal=True):
                    for p in range(1, 5):
                        dpg.add_checkbox(
                            label=f"DI{p}",
                            tag=f"checkbox_di{p}",
                            enabled=False,
                            callback=self.on_check_di,
                            user_data=p,
                        )
                dpg.add_spacer()
                dpg.add_separator()
                dpg.add_spacer()
                # AIx
                with dpg.group(horizontal=False):
                    with dpg.group(horizontal=True):
                        for p in range(1, 5):
                            with dpg.group(horizontal=False):
                                dpg.add_text(f"AI{p}")
                                dpg.add_slider_int(
                                    label="",
                                    width=80,
                                    default_value=0,
                                    min_value=0,
                                    max_value=2000,
                                    tag=f"slider_ai{p}",
                                    enabled=False,
                                    callback=self.on_change_ai,
                                    user_data=p,
                                )

    def update_state(self, new_state: State) -> None:
        """Update the UI and internal state based on the application's current status."""

        # MARK: UI State
        self.state = new_state
        match new_state:
            case self.State.IDLE:
                dpg.configure_item("ports_combo", enabled=True)
                dpg.configure_item("start_button", enabled=True)
                dpg.configure_item("stop_button", enabled=False)
                for p in range(1, 5):
                    dpg.configure_item(f"checkbox_di{p}", enabled=False)
                    dpg.configure_item(f"slider_ai{p}", enabled=False)
                dpg.bind_item_theme("window_twelite_dip_input", self.theme_disabled)
                dpg.bind_item_theme("window_twelite_dip_output", self.theme_disabled)
            case self.State.CONNECTING:
                dpg.configure_item("ports_combo", enabled=False)
                dpg.configure_item("start_button", enabled=False)
                dpg.configure_item("stop_button", enabled=True)
                for p in range(1, 5):
                    dpg.configure_item(f"checkbox_di{p}", enabled=True)
                    dpg.configure_item(f"slider_ai{p}", enabled=False)
                dpg.bind_item_theme("window_twelite_dip_input", 0)
                dpg.bind_item_theme("window_twelite_dip_output", 0)

    # MARK: UI Handlers

    def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle serial port selection from the combo box."""

        self.selected_port = app_data

    def on_start(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle the start button click and initiate communication."""

        self.start()

    def on_stop(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle the stop button click and terminate communication."""

        self.update_state(self.State.IDLE)
        self.twelite.close()

    def on_check_di(self, sender: Any, app_data: bool, user_data: int) -> None:
        """Handle checkbox state changes for DI output control."""

        if 1 <= user_data <= 4:
            self.command_app_twelite.di_state[user_data - 1] = app_data
            self.twelite.send(self.command_app_twelite)

    def on_change_ai(self, sender: Any, app_data: int, user_data: int) -> None:
        """Handle slider changes for AI values and send PWM updates."""

        if 1 <= user_data <= 4:
            self.command_app_twelite.pwm_duty[user_data - 1] = app_data * 1024 // 2000
            self.twelite.send(self.command_app_twelite)

    # MARK: Packet handler

    def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        """Update the GUI based on incoming TWELITE DIP packet data."""

        now = perf_counter()
        for p in range(1, 5):
            # DO
            p_state = packet.di_state[p - 1]
            match p:
                case 1:  # RED
                    color = (255, 0, 0, 255) if p_state else (0, 0, 0, 255)
                case 2:  # GREEN
                    color = (0, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 3:  # YELLOW
                    color = (255, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 4:  # BLUE
                    color = (0, 0, 255, 255) if p_state else (0, 0, 0, 255)
            dpg.configure_item(f"circle_do{p}", fill=color)
            # PWM
            self.pwm_data[p] = packet.ai_voltage[p - 1]
            self.pwm_plot_data[p].append((now, self.pwm_data[p]))

    # MARK: Plot

    def update_plot(self) -> None:
        """Update the plot with the latest PWM data."""

        now = perf_counter()
        # Throttle plot updates to PLOT_UPDATE_INTERVAL
        if now - self.last_plot < PLOT_UPDATE_INTERVAL:
            return
        self.last_plot = now
        for p in range(1, 5):
            # If no new value was added recently, duplicate the last value
            if self.pwm_plot_data[p]:
                last_time, last_val = self.pwm_plot_data[p][-1]
                if now - last_time > PLOT_UPDATE_INTERVAL:
                    self.pwm_plot_data[p].append((now, last_val))
            else:
                self.pwm_plot_data[p].append((now, self.pwm_data[p]))
            # Remove old values older than PLOT_X_LIMIT
            while (
                self.pwm_plot_data[p]
                and now - self.pwm_plot_data[p][0][0] > PLOT_X_LIMIT
            ):
                self.pwm_plot_data[p].popleft()
            x_series, y_series = (
                zip(*list(self.pwm_plot_data[p])) if self.pwm_plot_data[p] else ([], [])
            )
            if x_series and y_series:
                dpg.set_value(f"series_pwm{p}", [x_series, y_series])
                dpg.set_axis_limits(f"plot_pwm{p}_x", x_series[0], x_series[-1])
                dpg.set_axis_limits(f"plot_pwm{p}_y", 0, 2000)

    # MARK: Initialize Variables

    def initialize(self) -> None:
        """Initialize internal command structure and default values."""

        ports = mw.utils.get_ports()
        self.selected_port = ports[0]  # default

        command_app_twelite_initial: dict[str, Any] = {
            "destination_logical_id": 0x78,  # All child devices
            "di_to_change": [True, True, True, True],  # Enable DI1-4
            "di_state": [False, False, False, False],  # Initial state of DIx
            "pwm_to_change": [True, True, True, True],  # Enable AI1-4
            "pwm_duty": [0, 0, 0, 0],  # Initial state of AIx
        }
        self.command_app_twelite = mw.serializers.app_twelite.Command(
            **command_app_twelite_initial
        )

        self.pwm_data = {1: 0, 2: 0, 3: 0, 4: 0}
        self.pwm_plot_data = {1: deque(), 2: deque(), 3: deque(), 4: deque()}
        self.last_plot = 0.0

    # MARK: Start

    def start(self) -> None:
        """Start TWELITE communication and register listeners."""

        # Create a twelite object
        self.twelite = mw.Twelite(self.selected_port)
        # Use JST for received data
        self.twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
        # Register event handler(s)
        self.twelite.add_listener(mw.common.PacketType.APP_TWELITE, self.on_app_twelite)

        self.update_state(self.State.CONNECTING)

    # MARK: loop

    def loop(self) -> None:
        """Perform periodic updates while the application is running."""

        match self.state:
            case self.State.IDLE:
                pass
            case self.State.CONNECTING:
                self.twelite.update()
                self.update_plot()
        dpg.render_dearpygui_frame()

    # MARK: Show

    def show(self) -> None:
        """Main execution loop of the application."""

        try:
            while dpg.is_dearpygui_running():
                self.loop()
        except KeyboardInterrupt:
            pass
        finally:
            print("Quit...")
            self.close()

    # MARK: Lifecycle

    def __enter__(self) -> Self:
        """Enter the runtime context related to this object."""

        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> bool | None:
        """Exit the runtime context and close the connection."""

        self.close()
        return None

    def __del__(self) -> None:
        """Destructor to ensure resources are cleaned up."""

        self.close()

    def close(self) -> None:
        """Cleanly shut down TWELITE connection and Dear PyGui context."""

        if self.state == self.State.CONNECTING:
            self.twelite.close()
        dpg.destroy_context()


# MARK: Entry point

if __name__ == "__main__":
    with MainViewport() as viewport:
        viewport.show()

Running the Script

Run the script.


python stick_dip_gui.py
  1. Select the serial port (e.g., COM3, /dev/tty.usb*) from the combo box labeled “Serial Port”
  2. Click the “Connect” button under “Transmission” to start communication
  3. Change the inputs of the TWELITE DIP and observe changes in the “TWELITE DIP Input” window
  4. Adjust the checkboxes and sliders in the “TWELITE DIP Output” window to modify outputs of the TWELITE DIP
  5. Click the “Disconnect” button to stop communication
  6. Close the window
Example of screen display

Example of screen display

If the communication is successful, the state of the TWELITE DIP will be displayed in real time, and its outputs can be modified at any time.

Script Overview

The initialization of Dear PyGui is handled by initialize_viewport().

This function sets up the font file and defines the size of the OS window (referred to as the Viewport).

    def initialize_viewport(self) -> None:
        """Set up Dear PyGui context, fonts, and viewport."""
        ...

The interface within the OS window is defined in create_windows().

Child windows on the screen are created using dpg.window(), and components are defined within them.

By assigning a tag to a component, you can modify its values and attributes from the program (similar to the id= in HTML).

    def create_windows(self) -> None:
        """Create and configure all Dear PyGui windows and their contents."""
        ...

Buttons, combo boxes, sliders, and checkboxes that accept user input can register a callback.

    def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle serial port selection from the combo box."""
        ...

As with other scripts, data received from a child TWELITE is handled by an event handler.

In on_app_twelite(), data to be reflected in DOx is registered immediately using tag, and data for PWMx is passed to update_plot() via member variables.

    def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        """Update the GUI based on incoming TWELITE DIP packet data."""
        ...

When the “Connect” button is pressed, start() is called.

This function initializes mwings.Twelite and registers the receive handler.

    def start(self) -> None:
        """Start TWELITE communication and register listeners."""
        ...

Because Dear PyGui is built on a low-level API, the application controls frame rendering directly, allowing you to define your own rendering loop. While the OS window is open, loop() is repeatedly called.

Once connected to the serial port, loop() performs the following three tasks:

    def loop(self) -> None:
        """Perform periodic updates while the application is running."""
        ...

The entire application runs when show() is called.

    def show(self) -> None:
        """Main execution loop of the application."""
        ...

The program exits the rendering loop either when dpg.is_dearpygui_running() becomes False after closing the window, or upon receiving a KeyboardInterrupt (Ctrl+C), and then cleans up mwings.Twelite and dearpygui.