セクションの複数ページをまとめています。 印刷またはPDF形式で保存...

もとのページに戻る

2025-08-01 現在

TWELITE STICK

USBドングルでTWELITEをPCを連携させる
USBドングル TWELITE STICK は、TWELITE の本体とアンテナ、TWELITE R3 の機能を一つのケースに収めた製品です。MONOSTICK シリーズの後継として、TWELITE と PC を連携させたいときに活躍します。
TWELITE STICK

TWELITE STICK

USBドングル

TWELITE STICK は、TWELITE の本体とアンテナに、USB アダプター TWELITE R シリーズの機能を組み合わせた製品です。

TWELITE STICK の構成

TWELITE STICK の構成

ほかの TWELITE から受信したデータを PC へ送信したり、PC から受信したデータをほかの TWELITE へ送信したり、USB 電源に接続して TWELITE のパケットを中継することもできます。

MONOSTICK との違い

TWELITE STICK は従来製品の MONOSTICK と同じパケットを扱うことができ、上位互換に相当します。

一般的な USB メモリと遜色ない大きさに小型化したことで、隣の USB ポートとの干渉を起こしづらくなっています。

MONOSTICK と TWELITE STICK の大きさの比較

MONOSTICK と TWELITE STICK の大きさの比較

また、TWELITE 本体を第二世代の GOLD シリーズとしたことで、従来の RED シリーズよりも受信待機時の消費電流を約10mA削減しながら、同等の送信出力と僅かに上回る受信感度を実現しました。

TWELITE STICK の受信待機→送信時の消費電流(再送4回中継、LED無効)

TWELITE STICK の受信待機→送信時の消費電流
(再送4回中継、LED無効)

MONOSTICK RED の消費電流の受信待機→送信時の消費電流(再送4回中継)

MONOSTICK RED の消費電流の受信待機→送信時の消費電流
(再送4回中継)

TWELITE APPS

工場出荷時の TWELITE STICK には TWELITE APPS 統合版 をインストールしています。TWELITE APPS 統合版は、TWELITE GOLD シリーズのプログラム容量を活かして、従来の機能を一つのファームウェアに収めたものです。

親機・中継機アプリ(App_Wings)シリアル通信アプリ(App_Uart)OTA設定アプリ(CUE / ARIA
子機との通信や、通信の中継UARTに特化した無線通信無線経由の設定変更

インタラクティブモードの操作を行うことで、ファームウェアの書き換えを行わずに、これらの機能を瞬時に切り替えて使うことができます。一部のアプリでは、状態に応じてロゴマークの点灯色が変化するほか、パケットの受信を受けて点滅します。

親機中継機子機
マゼンタ

マゼンタ

イエロー

イエロー

シアン

シアン

シンプルな無線通信

TWELITE は、起動直後から通信できます。Bluetooth のようなペアリングを行いません。

周波数チャネルを合わせた端末同士で同報通信を行います。したがって、同一の周波数チャネルでは、同時に複数の端末が送信を行うことはできません。自分宛でないパケットは無視します。トランシーバやインカムの動作をイメージすると分かりやすいかもしれません。

親機や中継機は、同じ周波数チャネルの端末から送信されるパケットを待ち受けます。

データ量の多い通信には適していないものの、単純な信号伝送といった用途には最適です。

1 - TWELITE STAGE APP による動作確認と設定

TWELITE STAGE APP を使って通信動作の確認や設定変更を行う方法
評価開発環境 TWELITE STAGE SDK に含まれる TWELITE STAGE APP を使うことで、通信動作の確認や設定変更を行うことができます。

基本的な動作確認

TWELITE STICK を接続

PC の USB ポートに TWELITE STICK を接続してください。

工場出荷時の TWELITE は、親機・中継機アプリの親機モードに設定されています。TWELITE シリーズの子機との間で送受信を行うことができ、ロゴマークはマゼンタに光るはずです。

工場出荷時の様子

工場出荷時の様子

起動メッセージを確認

TWELITE STICK の動作を確かめるために、まずは起動時に出力されるメッセージを確認してください。

TWELITE STICK を接続した状態で、MWSTAGEフォルダ内のTWELITE_Stageを実行します。シリアルポート選択の画面に、TWELITE STICKが表示されます。

シリアルポート選択

シリアルポート選択

TWELITE STICK を選択すると、メインメニューに移行します。

メインメニュー

メインメニュー

ビューアを選択すると、ビューア選択のメニューに移行します。

ビューア

ビューア

ターミナルを選択すると、一般的なターミナルソフトと同様に VT-100 互換のターミナル画面を表示します。

ターミナル画面

ターミナル画面

初期状態では、上記のように TWELITE STICK が受信したパケットのデータを出力しているかもしれません。

右下にあるファーム書換/リセットを長押しして、TWELITE をリセットしてください。

右下を長押しでリセット

右下を長押しでリセット

起動メッセージが出力されます。

起動メッセージの出力例

起動メッセージの出力例

上記の例では、次のメッセージを出力しています。

!INF MW APP_WINGS(Parent) v1-03-2, SID=0x8300051A

このように !INF MW APP_WINGS(Parent) と表示されていれば、親機・中継機アプリ(App_Wings)の親機モードが動作していることが分かります。

TWELITE DIP の子機と通信

ここでは、TWELITE DIP の子機との通信動作を確認する方法をご紹介します。

TWELITE DIP は、工場出荷時に書き込まれている 超簡単!標準アプリ(App_Twelite)が動作しているものとします。

TWELITE DIP の準備

TWELITE DIP の子機の DI1GND の間にスイッチを、DO1VCC の間に LED を接続してください。子機は超簡単!標準アプリの端末どうしの通信を確認する場合と同じ構成です。

子機の配線例

子機の配線例

TWELITE DIP の親機の代わりに TWELITE STICK を使うことで、ボタンの状態を PC 側で検知したり、LED を PC から光らせることができます。

TWELITE STICK の接続

TWELITE STICK を PC の USBポートに接続してください。

親機・中継機アプリの親機モードに設定されていれば、マゼンタに光ります。

親機モードの待機状態

親機モードの待機状態

端末の設定

端末の設定変更は不要です。工場出荷時の状態のまま、すぐに通信を開始できます。

しかし、ここでは念のため設定値を確認しておきましょう。

TWELITE シリーズの設定変更を UART 通信によって行うには、端末をインタラクティブモードで起動します。TWELITE STAGE APP には、インタラクティブモードを扱うための機能があります。

メインメニューからインタラクティブモードを選択します。

メインメニュー

メインメニュー

次の画面では、任意の場所をクリックするかEnterキーを押すことで次に進みます。

確認画面

確認画面

以下のような画面が表示されたら成功です。

インタラクティブモード

インタラクティブモード

物理的にネットワークを分離する周波数チャネルの値が18、論理的にネットワークを分離するアプリケーションIDの値が67720102となっていれば、工場出荷時の TWELITE DIP と通信できます。

ロゴマークの点灯

TWELITE STICK を PC に接続した状態で、TWELITE DIP の DI1 に接続したスイッチを押してください。

正常に動作していれば、ボタンを押している間はより明るく、赤色に光ります。

DI1を押すと赤く点灯する

DI1を押すと赤く点灯する

標準アプリビューア

TWELITE STAGE APP には、超簡単!標準アプリの子機から受信したデータを表示する機能があります。

DI1へ接続したスイッチの状態を取得しましょう。

メインメニューからビューアを選択します。

メインメニュー

メインメニュー

ビューア選択の画面から、標準アプリ ビューアを選んでください。

ビューア選択

ビューア選択

標準アプリビューアの画面では、TWELITE STICK が受信した最新の超簡単!標準アプリからのデータを表示します。

標準アプリビューア

標準アプリビューア

TWELITE DIP の DI1 に接続したスイッチを押すと、D1の項目が赤く光ります。

標準アプリコマンダー

TWELITE STAGE APP には、超簡単!標準アプリの子機へデータを送信する機能もあります。DO1へ接続した LED を制御してみましょう。

メインメニューからビューアを開きます。

メインメニュー

メインメニュー

ビューア選択の画面から、コマンダーを選んでください。

ビューア選択

ビューア選択

App_Twelite 0x80コマンドを選びます。

標準アプリのコマンダー

標準アプリのコマンダー

DI1(1)をクリックするか、1キーを押すことでDI1の点灯状態を制御できます。

コマンダーの画面

コマンダーの画面

TWELITE ARIA と通信

ここでは、温湿度センサータグ TWELITE ARIA との通信動作を確認する方法をご紹介します。

TWELITE ARIA は、工場出荷時に書き込まれている アリアアプリ(App_ARIA)が動作しているものとします。

TWELITE ARIA の準備

TWELITE ARIA に CR2032 電池を挿入してください。

CR2032電池を挿入

CR2032電池を挿入

TWELITE ARIA の親機に TWELITE STICK を使うことで、温湿度や磁石が接近した情報を PC 側で取得できます。

TWELITE STICK の接続

TWELITE STICK を PC の USBポートに接続してください。

親機・中継機アプリの親機モードに設定されていれば、マゼンタに光ります。

親機モードの待機状態

親機モードの待機状態

端末の設定

端末の設定変更は不要です。工場出荷時の状態のまま、すぐに通信を開始できます。

物理的にネットワークを分離する周波数チャネルの値が18、論理的にネットワークを分離するアプリケーションIDの値が67720102となっていれば、工場出荷時の TWELITE ARIA と通信できます。

ロゴマークの点灯

TWELITE STICK を PC に接続した状態で、TWELITE ARIA のホールセンサに磁石を近づけてください。

正常に動作していれば、TWELITE ARIA は磁石を近づけるたびにパケットを送信し、TWELITE STICK のロゴマークがそれを受けて点滅します。

簡易モニタ

TWELITE STAGE APP には、TWELITE ARIA から受信したデータをシンプルに表示する機能があります。

メインメニューからビューアを選択します。

メインメニュー

メインメニュー

ビューア選択の画面から、簡易モニタ(CUE/ARIA/Glancer)を選んでください。

ビューア選択

ビューア選択

続いて、TWELITE ARIA の簡易モニタを選択します。

簡易モニタ選択画面

簡易モニタ選択画面

簡易モニタの画面では、TWELITE STICK が受信した最新の TWELITE ARIA のデータを表示します。

簡易モニタ

簡易モニタ

TWELITE ARIA に磁石を近づけると、新たなパケットが届きます。

センサーグラフ

TWELITE STAGE APP には、子機から受信した温湿度データなどをグラフとして表示する機能もあります。

メインメニューからビューアを開きます。

メインメニュー

メインメニュー

ビューア選択の画面から、グラフ表示(加速度リアルタイム/センサー)を選んでください。

ビューア選択

ビューア選択

続いて、センサーグラフを選択します。

グラフ選択画面

グラフ選択画面

右側のノード一覧から、ARAで始まるものを選択してください。

ノード選択

ノード選択

初期状態では、24時間分のデータが表示されます。直近のデータを確認するために、[ライブ>>]をクリックしてください。

24時間データ

24時間データ

直近のデータが表示されます。磁石を近づける、7Pインタフェース近くのセンサを温めるなどして、値が変化する様子を確認してください。

ライブデータ

ライブデータ

2 - Python による子機との通信(基本編)

Python を使って子機との通信を行う方法
専用のライブラリを使うことで、Python から TWELITE STICK を通して TWELITE の子機と通信することができます。

基本的な動作確認

TWELITE STICK を接続

PC の USB ポートに TWELITE STICK を接続してください。

工場出荷時の TWELITE は、親機・中継機アプリの親機モードに設定されています。TWELITE シリーズの子機との間で送受信を行うことができ、ロゴマークはマゼンタに光るはずです。

工場出荷時の様子

工場出荷時の様子

MWings ライブラリの導入

Python 3.12 以降の環境を用意してください。

親機・中継機アプリの出力を解釈するための MWings モジュール を導入します。


pip install mwings

MWings モジュールは、ホストへ接続された TWELITE の親機を通じて、TWELITE の子機と通信できます。

  • 親機から受信したデータを解釈して、辞書や JSON のほか pandas データフレームへ変換※
  • 辞書から生成したコマンドを親機へ送信

※ Lite 版は pandas に非対応

データの受信を確認

TWELITE STICK が受信したデータを解釈できることを確認します。

GitHubで公開しているサンプルスクリプト rx_print_json.py を実行してください。

TWELITE STICK が受信した全種類のパケットの内容を、ターミナルに出力される JSON 形式の文字列によって確認できます。

# -*- coding:utf-8 -*-
# Written for Python 3.12
# Formatted with Black

# MWings example: Receive data, print JSON, typed

from zoneinfo import ZoneInfo

import mwings as mw


# Main function
def main() -> None:
    # Create a twelite object
    twelite = mw.Twelite(mw.utils.ask_user_for_port())

    # Use JST for received data
    twelite.set_timezone(ZoneInfo("Asia/Tokyo"))

    # Register event handlers
    @twelite.on(mw.common.PacketType.APP_ARIA)
    def on_app_aria(packet: mw.parsers.app_aria.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_CUE)
    def on_app_cue(packet: mw.parsers.app_cue.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_CUE_PAL_EVENT)
    def on_app_cue_pal_event(packet: mw.parsers.app_cue_pal_event.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_IO)
    def on_app_io(packet: mw.parsers.app_io.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_TWELITE)
    def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_PAL_AMB)
    def on_app_pal_amb(packet: mw.parsers.app_pal_amb.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_PAL_MOT)
    def on_app_pal_mot(packet: mw.parsers.app_pal_mot.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_PAL_OPENCLOSE)
    def on_app_pal_openclose(packet: mw.parsers.app_pal_openclose.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_UART_ASCII)
    def on_app_uart_ascii(packet: mw.parsers.app_uart_ascii.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.APP_UART_ASCII_EXTENDED)
    def on_app_uart_ascii_extended(
        packet: mw.parsers.app_uart_ascii_extended.ParsedPacket,
    ) -> None:
        print(packet.to_json(verbose=False, spread=True))

    @twelite.on(mw.common.PacketType.ACT)
    def on_act(packet: mw.parsers.act.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    # Start receiving
    try:
        # Set as daemon thread
        twelite.daemon = True
        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            twelite.join(0.5)
    except KeyboardInterrupt:
        # Stop the thread
        print("Flushing...")
        twelite.stop()
        print("Completed")


if __name__ == "__main__":
    # Call the main function
    main()

例えば、前項のTWELITE STAGE APP の場合と同様に、工場出荷時の TWELITE DIP(超簡単!標準アプリ)の子機を用意していたなら、次のような出力を確認できるはずです。終了するには Ctrl+C を入力してください。

{
  "time_parsed": "2025-07-11T11:45:02.067804+09:00",
  "packet_type": "APP_TWELITE",
  "sequence_number": 4713,
  "source_serial_id": "8201007F",
  "source_logical_id": 120,
  "lqi": 166,
  "supply_voltage": 3254,
  "destination_logical_id": 0,
  "relay_count": 0,
  "periodic": true,
  "di_changed_1": false,
  "di_changed_2": false,
  "di_changed_3": false,
  "di_changed_4": false,
  "di_state_1": false,
  "di_state_2": false,
  "di_state_3": false,
  "di_state_4": false,
  "ai_voltage_1": 2000,
  "ai_voltage_2": 2000,
  "ai_voltage_3": 2000,
  "ai_voltage_4": 2000
}

任意のデータを送受信

ここでは、TWELITE STAGE APP の場合と同様に準備した TWELITE DIP の子機との通信に特化したスクリプトを作成してみましょう。

TWELITE DIP 子機の配線例

TWELITE DIP 子機の配線例

TWELITE DIP の DI1 ピンに接続したスイッチの状態表示と、DO1 ピンに接続した LED の制御を目標とします。

超簡単!標準アプリのデータのみ受信

rx_print_json.pyを改変し、超簡単!標準アプリのデータだけを受信するように単純化したスクリプトを作成します。

# -*- coding:utf-8 -*-

from zoneinfo import ZoneInfo

import mwings as mw


# Main function
def main() -> None:
    # Create a twelite object
    twelite = mw.Twelite(mw.utils.ask_user_for_port())

    # Use JST for received data
    twelite.set_timezone(ZoneInfo("Asia/Tokyo"))

    # Register event handlers
    @twelite.on(mw.common.PacketType.APP_TWELITE)
    def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        print(packet.to_json(verbose=False, spread=True))

    # Start receiving
    try:
        # Set as daemon thread
        twelite.daemon = True
        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            twelite.join(0.5)
    except KeyboardInterrupt:
        # Stop the thread
        print("Flushing...")
        twelite.stop()
        print("Completed")


if __name__ == "__main__":
    # Call the main function
    main()

特定の値を取得

これまでのスクリプトでは、to_json()メソッドによって、すべてのデータを JSON 文字列として出力していました。

ここでは、DI1ピンの状態だけをparsers.app_twelite.ParsedPacketdi_state から取得し、仮想的なLEDをターミナルに表示します。

on_app_twelite()ハンドラを以下のように書き換えてください。

def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        print(f"\rDO1 LED: {"🔴" if packet.di_state[0] else "⚪"}", end='', flush=True)

次のようにして、TWELITE DIP の DI ピンに接続したスイッチを押すたびに赤く光ったら成功です。

表示例

表示例

親機にコマンドを送信

TWELITE STICK が受信したデータを表示するだけではなく、TWELITE STICK からデータを送信することもできます。

スクリプトのメインループでは、threading.Thread.join()を0.5秒おきに呼び出すことで、メインスレッドの終了を検知した際に受信スレッドも終了できるようにしています。

        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            twelite.join(0.5)

この仕組みを利用して、メインループから TWELITE DIP の DO1 ピンを制御し、0.5秒おきに LED を点滅させてみましょう。

先のスクリプトを、次のように改変してください。これまでの変更も適用した全体を示します。

# -*- coding:utf-8 -*-

from zoneinfo import ZoneInfo
from typing import Any

import mwings as mw


# Main function
def main() -> None:
    # Create a twelite object
    twelite = mw.Twelite(mw.utils.ask_user_for_port())

    # Use JST for received data
    twelite.set_timezone(ZoneInfo("Asia/Tokyo"))

    # Register event handlers
    @twelite.on(mw.common.PacketType.APP_TWELITE)
    def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        print(f"\rDO1 LED: {"🔴" if packet.di_state[0] else "⚪"}", end='', flush=True)

    # Initialize command
    initial: dict[str, Any] = {
        "destination_logical_id": 0x78,  # All child devices
        "di_to_change": [True, False, False, False],  # Enable DI1
        "di_state": [False, False, False, False],  # Initial state of DIx
    }
    command = mw.serializers.app_twelite.Command(**initial)

    # Toggle the DI1 state
    def toggle_di1() -> None:
        command.di_state[0] = not command.di_state[0]
        twelite.send(command)

    # Start receiving
    try:
        # Set as daemon thread
        twelite.daemon = True
        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            toggle_di1() # Send
            twelite.join(0.5) # Receive
    except KeyboardInterrupt:
        # Stop the thread
        print("Flushing...")
        twelite.stop()
        print("Completed")


if __name__ == "__main__":
    # Call the main function
    main()

このスクリプトを実行すると、先ほどと同様に仮想的なLEDを表示します。

表示例

表示例

その一方で、TWELITE DIP の DO1 ピンに接続した LED は点滅し続けるはずです。スクリプトを終了すると、点滅は止まります。

2.1 - Python による子機との通信(ウェブサーバIoT編)

Python を使って子機のデータをウェブサーバへ送信する
実用に近い応用編として、ウェブサーバを利用した IoT システムを構築します。

TWELITE ARIA の温湿度データを収集

温湿度センサータグ TWELITE ARIA から受信した温湿度データをウェブサーバへ送信し、ページ上のグラフに表示する簡単な IoT システムを構築してみましょう。

ThingSpeakの表示例

ThingSpeakの表示例

TWELITE DIP を対象とした基本編のスクリプトでは DI1DO1 の簡単な操作に留めましたが、実際の IoT システムでは、REST API などを利用して、取得したデータをさらに上位のサーバへ送信する必要があります。

ThingSpeak について

ここでは、MathWorks のサービス ThingSpeakMWings ライブラリを組み合わせて使います。

アカウントの作成

ThingSpeak のサイトへアクセスし、MathWorks のアカウントを作成します。

Channel の作成

“Channel” を作成し、次のように設定します(“Name"や"Description"は適当で構いません)

Channelの設定例

Channelの設定例

API Keys の取得

Channel ページの “API Keys” タブに移動し、16文字の “Write API key” を控えておきます。

TWELITE ARIA の設定と起動

TWELITE ARIA の設定変更

TWELITE ARIA の設定を変更し、TWELITE ARIA モード送信間隔を20秒以上とします(サーバへの過負荷を避けるため)。

TWELITE ARIA の起動

CR2032 電池を投入し、TWELITE ARIA を起動します。

CR2032電池を挿入

CR2032電池を挿入

スクリプトの作成と実行

モジュールの導入

Python 3.12 以降と mwings(またはmwingslite)およびrequestsモジュールを導入します。


pip install mwings requests

スクリプトの作成

後述のスクリプト stick_aria_thingspeak.py を作成してください。mwings.parsers.app_ariaを使ってデータを受信し、requestsモジュールを使って ThingSpeak へ HTTP GET リクエストを送信します。

# -*- coding:utf-8 -*-

from zoneinfo import ZoneInfo
from time import perf_counter

import mwings as mw
import requests


API_KEY = "XXXXXXXXXXXXXXXX"  # Replace with your ThingSpeak API key
BASE_URL = "https://api.thingspeak.com/update"
SEND_MIN_INTERVAL = 20  # Minimum interval in seconds to send data to ThingSpeak


# Main function
def main() -> None:
    # Create a twelite object
    twelite = mw.Twelite(mw.utils.ask_user_for_port())

    # Use JST for received data
    twelite.set_timezone(ZoneInfo("Asia/Tokyo"))

    # Initialize last send time
    last_send_time = perf_counter() - SEND_MIN_INTERVAL

    # Initialize the target serial ID
    target_serial_id = -1

    # Register event handlers
    @twelite.on(mw.common.PacketType.APP_ARIA)
    def on_app_aria(packet: mw.parsers.app_aria.ParsedPacket) -> None:

        # Filter packets by serial ID
        if target_serial_id < 0:
            # Set the serial ID from the received packet
            target_serial_id = packet.source_serial_id
            print(f"Serial ID set to {target_serial_id:08X}")
        elif packet.source_serial_id != target_serial_id:
            # Ignore packets from other serial ID devices
            print(
                f"Ignoring packet from serial ID {packet.source_serial_id:08X}, expected {target_serial_id:08X}"
            )
            return

        # Throttle sending to ThingSpeak
        if perf_counter() - last_send_time < SEND_MIN_INTERVAL:
            print("Skipping send due to minimum interval")
            return  # Skip sending if within the minimum interval
        last_send_time = perf_counter()  # Update last send time

        # Send data to ThingSpeak
        payload = {
            "api_key": API_KEY,
            "field1": f"{packet.temp_100x / 100.0:.2f}",  # Temperature
            "field2": f"{packet.humid_100x / 100.0:.2f}",  # Humidity
            "field3": f"{packet.supply_voltage}",  # Supply voltage (mV)
            "field4": f"{packet.lqi}",  # Link Quality Indicator
        }
        response = requests.get(BASE_URL, params=payload)

        # Check the response status
        if response.status_code == 200:
            print(f"OK: entry ID = {response.text}")
        else:
            print(f"NG: status code = {response.status_code}")

    # Start receiving
    try:
        # Set as daemon thread
        twelite.daemon = True
        # Start the thread, Join to the main thread
        twelite.start()
        print("Started receiving")
        while True:
            twelite.join(0.5)  # Receive
    except KeyboardInterrupt:
        # Stop the thread
        print("Flushing...")
        twelite.stop()
        print("Completed")


if __name__ == "__main__":
    # Call the main function
    main()

スクリプトの実行

スクリプトを実行します。


python stick_aria_thingspeak.py

TWELITE ARIA からのデータを受信し、正常にデータを送信できたなら、次のようにして値の続き番号を示す Entry ID が表示されます。

Started receiving
Serial ID set to 8201C2DC
OK: entry ID = 1
OK: entry ID = 2
OK: entry ID = 3
OK: entry ID = 4
OK: entry ID = 5
...

ThingSpeak の “Private View” タブをクリックしてください。送信されたデータがグラフに表示されるはずです。

初期状態では、Y軸の範囲が動的に変化します。各グラフの✏️ボタンから最大最小値などを設定できます。

約2日分のデータを表示する様子

約2日分のデータを表示する様子

スクリプトの解説

サーバへ送信するデータは、次の部分で構成しています。

# Send data to ThingSpeak
payload = {
    "api_key": API_KEY,
    "field1": f"{packet.temp_100x / 100.0:.2f}",  # Temperature
    "field2": f"{packet.humid_100x / 100.0:.2f}",  # Humidity
    "field3": f"{packet.supply_voltage}",  # Supply voltage (mV)
    "field4": f"{packet.lqi}",  # Link Quality Indicator
}
response = requests.get(BASE_URL, params=payload)

mwings.parsers.app_aria から温湿度とコイン電池の電源電圧、0-255の数値で表現される電波通信品質(LQI)を取得し、文字列に変換したうえで GET リクエストのクエリとしています。

2.2 - Python による子機との通信(グラフィカルUI編)

Python を使って子機と通信するためのグラフィカルUIを備えたアプリケーションを構築する
実用に近い応用編として、簡単なグラフィカルインタフェースを備えたアプリケーションを作成します。

TWELTIE DIP と通信する

TWELITE DIP と通信するアプリケーションを、簡単なユーザインタフェースとともに構築してみましょう。

画面の表示例

画面の表示例

TWELITE DIP を対象とした基本編のスクリプトでは DI1DO1 の簡単な操作に留めましたが、実際にモニタリングを行うためのアプリケーションは、TWELITE STAGE APPのようなユーザインタフェースを必要とします。

Dear PyGui について

ここでは、Dear PyGuiMWings ライブラリを組み合わせて使います。

TWELITE DIP の配線と起動

このアプリケーションでは、TWELITE DIP の DIxAIx に入力された信号を受信すると同時に、DOxPWMx へ信号を出力させます。

VCCGND のほか、以下に示す16のピンを任意に接続してください。

種別ピン1234範囲備考
デジタル入力DIx#15#16#17#180.0V - VCC内部プルアップ
アナログ入力AIx#22#23#24#250.0V - 2.0V超過で無効扱い
デジタル出力DOx#5#8#9#120.0V - VCCLEDカソード接続
PWM出力PWMx#4#6#7#110% - 100%電圧はVCCレベル

スクリプトの作成と実行

モジュールの導入

mwings(またはmwingslite)およびdearpygui モジュールを導入します。


pip install mwings dearpygui

スクリプトの作成

後述のスクリプト stick_dip_gui.py を作成してください。mwings.parsers.app_tweliteを使ってデータを受信し、mwings.serializers.app_tweliteを使ってデータを送信する機能をdearpygui経由で提供します。

# -*- coding:utf-8 -*-

from time import perf_counter
from enum import IntEnum, auto
from zoneinfo import ZoneInfo
from typing import Any, Self, final
from types import TracebackType
from pathlib import Path
from collections import deque

import mwings as mw
import dearpygui.dearpygui as dpg  # type: ignore


PLOT_UPDATE_INTERVAL = 1.0 / 100  # 100Hz
PLOT_X_LIMIT = 5.0  # 3 seconds

FONT = "Mplus1Code-Medium.otf"

# MARK: MainViewport


@final
class MainViewport:
    @final
    class State(IntEnum):
        """Application state enumeration."""

        IDLE = auto()
        CONNECTING = auto()

    selected_port: str
    twelite: mw.Twelite

    command_app_twelite: mw.serializers.app_twelite.Command

    pwm_data: dict[int, int]
    pwm_plot_data: dict[int, deque[tuple[float, int]]]
    last_plot: float

    state: State
    theme_disabled: int

    def __init__(self) -> None:
        """Initialize the main window and prepare all components."""

        self.initialize_viewport()
        self.create_themes()
        self.create_windows()
        self.initialize()
        self.update_state(self.State.IDLE)

    def create_themes(self) -> None:
        """Create a visual theme for disabled UI elements."""

        with dpg.theme() as self.theme_disabled:
            with dpg.theme_component(dpg.mvAll):
                dpg.add_theme_color(dpg.mvThemeCol_Text, (150, 150, 150, 255))
                dpg.add_theme_color(dpg.mvThemeCol_TextDisabled, (100, 100, 100, 255))
                dpg.add_theme_color(dpg.mvThemeCol_FrameBg, (5, 5, 5, 255))

    # MARK: Initialize Viewport

    def initialize_viewport(self) -> None:
        """Set up Dear PyGui context, fonts, and viewport."""

        dpg.create_context()
        with dpg.font_registry():
            font_path = str(Path(__file__).parent / FONT)
            with dpg.font(file=font_path, size=18) as default_font:
                dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
            dpg.bind_font(default_font)
        dpg.create_viewport(
            title="TWELITE STICK", width=620, height=440, resizable=True
        )
        dpg.set_viewport_vsync(False)
        dpg.setup_dearpygui()
        dpg.show_viewport()

    def create_windows(self) -> None:
        """Create and configure all Dear PyGui windows and their contents."""

        # MARK: Panel Window
        with dpg.window(
            no_title_bar=True,
            no_close=True,
            no_collapse=True,
            no_move=True,
            no_resize=True,
            pos=(10, 10),
            width=600,
            height=100,
        ):
            ports = mw.utils.get_ports()
            with dpg.group(horizontal=True):
                with dpg.child_window(width=300):
                    dpg.add_text("シリアルポート")
                    dpg.add_combo(
                        items=ports,
                        default_value=ports[0],
                        tag="ports_combo",
                        width=280,
                        enabled=True,
                        callback=self.on_select_port,
                    )
                with dpg.child_window(width=150):
                    dpg.add_text("シリアル通信")
                    with dpg.group(horizontal=True):
                        dpg.add_button(
                            label="接続",
                            tag="start_button",
                            enabled=True,
                            callback=self.on_start,
                        )
                        dpg.add_button(
                            label="切断",
                            tag="stop_button",
                            enabled=False,
                            callback=self.on_stop,
                        )

        # MARK: Input Window
        with dpg.window(
            label="TWELITE DIP 入力",
            no_close=True,
            # no_resize=True,
            pos=(10, 120),
            width=600,
            height=160,
            tag="window_twelite_dip_input",
        ):
            with dpg.group(horizontal=False):
                # DOx
                with dpg.group(horizontal=True):
                    for p in range(1, 5):
                        with dpg.group(horizontal=True):
                            with dpg.drawlist(
                                width=20, height=20, tag=f"indicator_do{p}"
                            ):
                                dpg.draw_circle(
                                    center=(10, 10),
                                    radius=10,
                                    fill=(0, 0, 0, 255),
                                    tag=f"circle_do{p}",
                                )
                            dpg.add_text(f"DO{p}")
                dpg.add_spacer()
                dpg.add_separator()
                dpg.add_spacer()
                # PWMx
                with dpg.group(horizontal=True):
                    for p in range(1, 5):
                        with dpg.plot(
                            label=f"PWM{p}",
                            height=80,
                            width=140,
                            tag=f"plot_pwm{p}",
                            no_frame=True,
                            no_mouse_pos=True,
                        ):
                            dpg.add_plot_axis(
                                dpg.mvXAxis,
                                tag=f"plot_pwm{p}_x",
                                no_tick_labels=True,
                            )
                            with dpg.plot_axis(
                                dpg.mvYAxis,
                                tag=f"plot_pwm{p}_y",
                                no_tick_labels=True,
                                no_tick_marks=True,
                                no_gridlines=True,
                            ):
                                dpg.add_line_series(
                                    [],
                                    [],
                                    tag=f"series_pwm{p}",
                                    parent=f"plot_pwm{p}_y",
                                )

        # MARK: Output Window
        with dpg.window(
            label="TWELITE DIP 出力",
            no_close=True,
            no_resize=True,
            pos=(10, 290),
            width=600,
            height=140,
            tag="window_twelite_dip_output",
        ):
            with dpg.group(horizontal=False):
                # DIx
                with dpg.group(horizontal=True):
                    for p in range(1, 5):
                        dpg.add_checkbox(
                            label=f"DI{p}",
                            tag=f"checkbox_di{p}",
                            enabled=False,
                            callback=self.on_check_di,
                            user_data=p,
                        )
                dpg.add_spacer()
                dpg.add_separator()
                dpg.add_spacer()
                # AIx
                with dpg.group(horizontal=False):
                    with dpg.group(horizontal=True):
                        for p in range(1, 5):
                            with dpg.group(horizontal=False):
                                dpg.add_text(f"AI{p}")
                                dpg.add_slider_int(
                                    label="",
                                    width=80,
                                    default_value=0,
                                    min_value=0,
                                    max_value=2000,
                                    tag=f"slider_ai{p}",
                                    enabled=False,
                                    callback=self.on_change_ai,
                                    user_data=p,
                                )

    def update_state(self, new_state: State) -> None:
        """Update the UI and internal state based on the application's current status."""

        # MARK: UI State
        self.state = new_state
        match new_state:
            case self.State.IDLE:
                dpg.configure_item("ports_combo", enabled=True)
                dpg.configure_item("start_button", enabled=True)
                dpg.configure_item("stop_button", enabled=False)
                for p in range(1, 5):
                    dpg.configure_item(f"checkbox_di{p}", enabled=False)
                    dpg.configure_item(f"slider_ai{p}", enabled=False)
                dpg.bind_item_theme("window_twelite_dip_input", self.theme_disabled)
                dpg.bind_item_theme("window_twelite_dip_output", self.theme_disabled)
            case self.State.CONNECTING:
                dpg.configure_item("ports_combo", enabled=False)
                dpg.configure_item("start_button", enabled=False)
                dpg.configure_item("stop_button", enabled=True)
                for p in range(1, 5):
                    dpg.configure_item(f"checkbox_di{p}", enabled=True)
                    dpg.configure_item(f"slider_ai{p}", enabled=False)
                dpg.bind_item_theme("window_twelite_dip_input", 0)
                dpg.bind_item_theme("window_twelite_dip_output", 0)

    # MARK: UI Handlers

    def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle serial port selection from the combo box."""

        self.selected_port = app_data

    def on_start(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle the start button click and initiate communication."""

        self.start()

    def on_stop(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle the stop button click and terminate communication."""

        self.update_state(self.State.IDLE)
        self.twelite.close()

    def on_check_di(self, sender: Any, app_data: bool, user_data: int) -> None:
        """Handle checkbox state changes for DI output control."""

        if 1 <= user_data <= 4:
            self.command_app_twelite.di_state[user_data - 1] = app_data
            self.twelite.send(self.command_app_twelite)

    def on_change_ai(self, sender: Any, app_data: int, user_data: int) -> None:
        """Handle slider changes for AI values and send PWM updates."""

        if 1 <= user_data <= 4:
            self.command_app_twelite.pwm_duty[user_data - 1] = app_data * 1024 // 2000
            self.twelite.send(self.command_app_twelite)

    # MARK: Packet handler

    def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        """Update the GUI based on incoming TWELITE DIP packet data."""

        now = perf_counter()
        for p in range(1, 5):
            # DO
            p_state = packet.di_state[p - 1]
            match p:
                case 1:  # RED
                    color = (255, 0, 0, 255) if p_state else (0, 0, 0, 255)
                case 2:  # GREEN
                    color = (0, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 3:  # YELLOW
                    color = (255, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 4:  # BLUE
                    color = (0, 0, 255, 255) if p_state else (0, 0, 0, 255)
            dpg.configure_item(f"circle_do{p}", fill=color)
            # PWM
            self.pwm_data[p] = packet.ai_voltage[p - 1]
            self.pwm_plot_data[p].append((now, self.pwm_data[p]))

    # MARK: Plot

    def update_plot(self) -> None:
        """Update the plot with the latest PWM data."""

        now = perf_counter()
        # Throttle plot updates to PLOT_UPDATE_INTERVAL
        if now - self.last_plot < PLOT_UPDATE_INTERVAL:
            return
        self.last_plot = now
        for p in range(1, 5):
            # If no new value was added recently, duplicate the last value
            if self.pwm_plot_data[p]:
                last_time, last_val = self.pwm_plot_data[p][-1]
                if now - last_time > PLOT_UPDATE_INTERVAL:
                    self.pwm_plot_data[p].append((now, last_val))
            else:
                self.pwm_plot_data[p].append((now, self.pwm_data[p]))
            # Remove old values older than PLOT_X_LIMIT
            while (
                self.pwm_plot_data[p]
                and now - self.pwm_plot_data[p][0][0] > PLOT_X_LIMIT
            ):
                self.pwm_plot_data[p].popleft()
            x_series, y_series = (
                zip(*list(self.pwm_plot_data[p])) if self.pwm_plot_data[p] else ([], [])
            )
            if x_series and y_series:
                dpg.set_value(f"series_pwm{p}", [x_series, y_series])
                dpg.set_axis_limits(f"plot_pwm{p}_x", x_series[0], x_series[-1])
                dpg.set_axis_limits(f"plot_pwm{p}_y", 0, 2000)

    # MARK: Initialize Variables

    def initialize(self) -> None:
        """Initialize internal command structure and default values."""

        ports = mw.utils.get_ports()
        self.selected_port = ports[0]  # default

        command_app_twelite_initial: dict[str, Any] = {
            "destination_logical_id": 0x78,  # All child devices
            "di_to_change": [True, True, True, True],  # Enable DI1-4
            "di_state": [False, False, False, False],  # Initial state of DIx
            "pwm_to_change": [True, True, True, True],  # Enable AI1-4
            "pwm_duty": [0, 0, 0, 0],  # Initial state of AIx
        }
        self.command_app_twelite = mw.serializers.app_twelite.Command(
            **command_app_twelite_initial
        )

        self.pwm_data = {1: 0, 2: 0, 3: 0, 4: 0}
        self.pwm_plot_data = {1: deque(), 2: deque(), 3: deque(), 4: deque()}
        self.last_plot = 0.0

    # MARK: Start

    def start(self) -> None:
        """Start TWELITE communication and register listeners."""

        # Create a twelite object
        self.twelite = mw.Twelite(self.selected_port)
        # Use JST for received data
        self.twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
        # Register event handler(s)
        self.twelite.add_listener(mw.common.PacketType.APP_TWELITE, self.on_app_twelite)

        self.update_state(self.State.CONNECTING)

    # MARK: loop

    def loop(self) -> None:
        """Perform periodic updates while the application is running."""

        match self.state:
            case self.State.IDLE:
                pass
            case self.State.CONNECTING:
                self.twelite.update()
                self.update_plot()
        dpg.render_dearpygui_frame()

    # MARK: Show

    def show(self) -> None:
        """Main execution loop of the application."""

        try:
            while dpg.is_dearpygui_running():
                self.loop()
        except KeyboardInterrupt:
            pass
        finally:
            print("Quit...")
            self.close()

    # MARK: Lifecycle

    def __enter__(self) -> Self:
        """Enter the runtime context related to this object."""

        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> bool | None:
        """Exit the runtime context and close the connection."""

        self.close()
        return None

    def __del__(self) -> None:
        """Destructor to ensure resources are cleaned up."""

        self.close()

    def close(self) -> None:
        """Cleanly shut down TWELITE connection and Dear PyGui context."""

        if self.state == self.State.CONNECTING:
            self.twelite.close()
        dpg.destroy_context()


# MARK: Entry point

if __name__ == "__main__":
    with MainViewport() as viewport:
        viewport.show()

スクリプトの実行

スクリプトを実行します。


python stick_dip_gui.py
  1. 「シリアルポート」のコンボボックスから、使用するポート(COM3, /dev/tty.usb*など)を選択
  2. 「シリアル通信」の「接続」ボタンを押して通信を開始
  3. TWELITE DIP の入力を変化させて、「TWELITE DIP 入力」ウィンドウの変化を確認
  4. 「TWELITE DIP 出力」ウィンドウのチェックボックスやスライダーを変化させて、TWELITE DIP の出力の変化を確認
  5. 「シリアル通信」の「切断」ボタンを押して通信を終了
  6. ウィンドウを閉じる
表示例

表示例

正常に通信できていれば、TWELITE DIP の状態をリアルタイムに表示しつつ、TWELITE DIP の出力を任意のタイミングで変更できるはずです。

スクリプトの解説

Dear PyGui の初期化は initialize_viewport() が行っています。

フォントファイルや、OSのウィンドウ(Viewportと呼ぶ)の大きさはここで登録しています。

    def initialize_viewport(self) -> None:
        """Set up Dear PyGui context, fonts, and viewport."""

        dpg.create_context()
        with dpg.font_registry():
            font_path = str(Path(__file__).parent / FONT)
            with dpg.font(file=font_path, size=18) as default_font:
                dpg.add_font_range_hint(dpg.mvFontRangeHint_Japanese)
            dpg.bind_font(default_font)
        dpg.create_viewport(
            title="TWELITE STICK", width=620, height=440, resizable=True
        )
        dpg.set_viewport_vsync(False)
        dpg.setup_dearpygui()
        dpg.show_viewport()

OSのウィンドウ内のインタフェースは、create_windows() に定義しています。

画面内の子ウィンドウは dpg.window()によって定義され、配下のコンポーネントを定義していきます。

コンポーネントには tag を登録することで、値や属性をプログラムから変更できます(HTMLのid=のようなイメージ)。

    def create_windows(self) -> None:
        """Create and configure all Dear PyGui windows and their contents."""

        # MARK: Panel Window
        with dpg.window(
            no_title_bar=True,
            no_close=True,
            no_collapse=True,
            no_move=True,
            no_resize=True,
            pos=(10, 10),
            width=600,
            height=100,
        ):
            ports = mw.utils.get_ports()
            with dpg.group(horizontal=True):
                with dpg.child_window(width=300):
                    dpg.add_text("シリアルポート")
                    dpg.add_combo(
                        items=ports,
                        default_value=ports[0],
                        tag="ports_combo",
                        width=280,
                        enabled=True,
                        callback=self.on_select_port,
                    )
                ...

また、入力を受け付けるボタンやコンボボックス、スライダーやチェックボックスでは、callback を登録できます。

    def on_select_port(self, sender: Any, app_data: str, user_data: Any) -> None:
        """Handle serial port selection from the combo box."""

        self.selected_port = app_data

ほかのスクリプトと同様に、TWELITE の子機から受信したデータは、イベントハンドラで受け取ることができます。

on_app_twelite() では、DOxに反映するデータを tag によって即座に登録し、PWMxに反映するデータはメンバ変数を通じて update_plot() へ渡しています。

    def on_app_twelite(self, packet: mw.parsers.app_twelite.ParsedPacket) -> None:
        """Update the GUI based on incoming TWELITE DIP packet data."""

        now = perf_counter()
        for p in range(1, 5):
            # DO
            p_state = packet.di_state[p - 1]
            match p:
                case 1:  # RED
                    color = (255, 0, 0, 255) if p_state else (0, 0, 0, 255)
                case 2:  # GREEN
                    color = (0, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 3:  # YELLOW
                    color = (255, 255, 0, 255) if p_state else (0, 0, 0, 255)
                case 4:  # BLUE
                    color = (0, 0, 255, 255) if p_state else (0, 0, 0, 255)
            dpg.configure_item(f"circle_do{p}", fill=color)
            # PWM
            self.pwm_data[p] = packet.ai_voltage[p - 1]
            self.pwm_plot_data[p].append((now, self.pwm_data[p]))

「開始」ボタンを押したときは、start() が呼び出されます。

ここで mwings.Twelite を初期化し、受信ハンドラを登録しています。

    def start(self) -> None:
        """Start TWELITE communication and register listeners."""

        # Create a twelite object
        self.twelite = mw.Twelite(self.selected_port)
        # Use JST for received data
        self.twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
        # Register event handler(s)
        self.twelite.add_listener(mw.common.PacketType.APP_TWELITE, self.on_app_twelite)

        self.update_state(self.State.CONNECTING)

Dear PyGui は低レベルな API を使って構築されているため、各フレームの描画をアプリケーションから指示し、描画ループを自前で定義することができます。OS のウィンドウが表示されている間、loop()は無限に呼び出されます。

シリアルポートへ接続したのち、loop() が行う処理は3つあります。

    def loop(self) -> None:
        """Perform periodic updates while the application is running."""

        match self.state:
            case self.State.IDLE:
                pass
            case self.State.CONNECTING:
                self.twelite.update()
                self.update_plot()
        dpg.render_dearpygui_frame()

画面全体の処理は、show() を呼び出すことで開始されます。

    def show(self) -> None:
        """Main execution loop of the application."""

        try:
            while dpg.is_dearpygui_running():
                self.loop()
        except KeyboardInterrupt:
            pass
        finally:
            print("Quit...")
            self.close()

プログラムは、画面を閉じる dpg.is_dearpygui_running()== False か、Ctrl+Cを入力する KeyboardInterrupt ことで描画ループを抜け、mwings.Twelitedearpygui のクローズ処理を経て終了します。