Python による子機との通信(ウェブサーバIoT編)
Python を使って子機のデータをウェブサーバへ送信する
PC の USB ポートに TWELITE STICK を接続してください。
工場出荷時の TWELITE は、親機・中継機アプリの親機モードに設定されています。TWELITE シリーズの子機との間で送受信を行うことができ、ロゴマークはマゼンタに光るはずです。
工場出荷時の様子
Python 3.12 以降の環境を用意してください。
親機・中継機アプリの出力を解釈するための MWings モジュール を導入します。
pip install mwings
MWings モジュールは、ホストへ接続された TWELITE の親機を通じて、TWELITE の子機と通信できます。
※ Lite 版は pandas に非対応
詳しい導入方法や機能の詳細については、MWings for Python マニュアル をご覧ください。
モダンなPython開発においては、Pythonのバージョンとプロジェクト単位の依存関係の管理が必要とされます。上記のマニュアルでは、pyenv と poetry を使った環境整備の方法を紹介しています。
Raspberry Pi の場合は、Lite 版 の mwingslite
モジュールを使用してください。
pandasへの依存を廃したバージョンであり、numpyの競合による問題を避けることができます。データフレームの出力は行えませんが、辞書形式やJSON文字列の出力機能は残しています。
軽量であるため、通常のPCであっても pandas を必要としない場合は、こちらをご利用ください。
TWELITE STICK が受信したデータを解釈できることを確認します。
GitHubで公開しているサンプルスクリプト rx_print_json.py
を実行してください。
TWELITE STICK が受信した全種類のパケットの内容を、ターミナルに出力される JSON 形式の文字列によって確認できます。
# -*- coding:utf-8 -*-
# Written for Python 3.12
# Formatted with Black
# MWings example: Receive data, print JSON, typed
from zoneinfo import ZoneInfo
import mwings as mw
# Main function
def main() -> None:
# Create a twelite object
twelite = mw.Twelite(mw.utils.ask_user_for_port())
# Use JST for received data
twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
# Register event handlers
@twelite.on(mw.common.PacketType.APP_ARIA)
def on_app_aria(packet: mw.parsers.app_aria.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.APP_CUE)
def on_app_cue(packet: mw.parsers.app_cue.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.APP_CUE_PAL_EVENT)
def on_app_cue_pal_event(packet: mw.parsers.app_cue_pal_event.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.APP_IO)
def on_app_io(packet: mw.parsers.app_io.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.APP_TWELITE)
def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.APP_PAL_AMB)
def on_app_pal_amb(packet: mw.parsers.app_pal_amb.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.APP_PAL_MOT)
def on_app_pal_mot(packet: mw.parsers.app_pal_mot.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.APP_PAL_OPENCLOSE)
def on_app_pal_openclose(packet: mw.parsers.app_pal_openclose.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.APP_UART_ASCII)
def on_app_uart_ascii(packet: mw.parsers.app_uart_ascii.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.APP_UART_ASCII_EXTENDED)
def on_app_uart_ascii_extended(
packet: mw.parsers.app_uart_ascii_extended.ParsedPacket,
) -> None:
print(packet.to_json(verbose=False, spread=True))
@twelite.on(mw.common.PacketType.ACT)
def on_act(packet: mw.parsers.act.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
# Start receiving
try:
# Set as daemon thread
twelite.daemon = True
# Start the thread, Join to the main thread
twelite.start()
print("Started receiving")
while True:
twelite.join(0.5)
except KeyboardInterrupt:
# Stop the thread
print("Flushing...")
twelite.stop()
print("Completed")
if __name__ == "__main__":
# Call the main function
main()
例えば、前項のTWELITE STAGE APP の場合と同様に、工場出荷時の TWELITE DIP(超簡単!標準アプリ)の子機を用意していたなら、次のような出力を確認できるはずです。終了するには Ctrl+C
を入力してください。
{
"time_parsed": "2025-07-11T11:45:02.067804+09:00",
"packet_type": "APP_TWELITE",
"sequence_number": 4713,
"source_serial_id": "8201007F",
"source_logical_id": 120,
"lqi": 166,
"supply_voltage": 3254,
"destination_logical_id": 0,
"relay_count": 0,
"periodic": true,
"di_changed_1": false,
"di_changed_2": false,
"di_changed_3": false,
"di_changed_4": false,
"di_state_1": false,
"di_state_2": false,
"di_state_3": false,
"di_state_4": false,
"ai_voltage_1": 2000,
"ai_voltage_2": 2000,
"ai_voltage_3": 2000,
"ai_voltage_4": 2000
}
mwings.parsers.app_twelite.ParsedPacket
キー | 値 |
---|---|
time_parsed | 受信時刻(デフォルトはUTC, ISO8601形式) |
packet_type | パケット種別 |
sequence_number | シーケンス番号(App_Tweliteの場合は時間) |
source_serial_id | 送信元シリアルID |
source_logical_id | 送信元論理デバイスID |
lqi | 電波通信品質(8bit) |
supply_voltage | 電源電圧(mV) |
destination_logical_id | 送信先論理デバイスID |
relay_count | 中継回数 |
periodic | 定期送信パケットか否か |
di_changed | 各デジタルインタフェースの入力変化の有無 |
di_state | 各デジタルインタフェースの入力状態 |
ai_voltage | 各アナログインタフェースの入力電圧 |
mwings_implementation | MWings の実装(将来のための情報) |
mwings_version | MWings のバージョン |
hostname | 受信したホストの名称 |
system_type | 受信したホストのシステムの種別 |
spread=True
の場合は、リスト形式を使わずに _1
, _2
といった形で値を分割します。
データを CSV ファイルに記録するサンプル rx_export_csv_durable.py
もございます。
コマンドラインツールとして、長期間の受信データの記録にそのまま使うことができて便利です。
python rx_export_csv_durable.py -h
usage: rx_export_csv_durable.py [-h] [-v] [-s]
Log packets from App_Wings to csv, line by line
options:
-h, --help show this help message and exit
-v, --verbose include system information
-s, --sort sort columns in the output
ここでは、TWELITE STAGE APP の場合と同様に準備した TWELITE DIP の子機との通信に特化したスクリプトを作成してみましょう。
TWELITE DIP 子機の配線例
TWELITE DIP の DI1
ピンに接続したスイッチの状態表示と、DO1
ピンに接続した LED の制御を目標とします。
rx_print_json.py
を改変し、超簡単!標準アプリのデータだけを受信するように単純化したスクリプトを作成します。
# -*- coding:utf-8 -*-
from zoneinfo import ZoneInfo
import mwings as mw
# Main function
def main() -> None:
# Create a twelite object
twelite = mw.Twelite(mw.utils.ask_user_for_port())
# Use JST for received data
twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
# Register event handlers
@twelite.on(mw.common.PacketType.APP_TWELITE)
def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
print(packet.to_json(verbose=False, spread=True))
# Start receiving
try:
# Set as daemon thread
twelite.daemon = True
# Start the thread, Join to the main thread
twelite.start()
print("Started receiving")
while True:
twelite.join(0.5)
except KeyboardInterrupt:
# Stop the thread
print("Flushing...")
twelite.stop()
print("Completed")
if __name__ == "__main__":
# Call the main function
main()
スクリプトの仕組み
twelite.start()
を呼び出すことで受信のためのスレッドを開始し、twelite.on
によってアプリの種別ごとに登録したイベントハンドラ(ここではon_app_twelite()
)を受信のたびに呼び出してもらっています。
詳しい解説は MWings for Python マニュアルのスクリプト解説 をご覧ください。
これまでのスクリプトでは、to_json()
メソッドによって、すべてのデータを JSON 文字列として出力していました。
ここでは、DI1
ピンの状態だけをparsers.app_twelite.ParsedPacket
の di_state
から取得し、仮想的なLEDをターミナルに表示します。
on_app_twelite()
ハンドラを以下のように書き換えてください。
def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
print(f"\rDO1 LED: {"🔴" if packet.di_state[0] else "⚪"}", end='', flush=True)
次のようにして、TWELITE DIP の DI
ピンに接続したスイッチを押すたびに赤く光ったら成功です。
表示例
TWELITE STICK が受信したデータを表示するだけではなく、TWELITE STICK からデータを送信することもできます。
スクリプトのメインループでは、threading.Thread.join()
を0.5秒おきに呼び出すことで、メインスレッドの終了を検知した際に受信スレッドも終了できるようにしています。
# Start the thread, Join to the main thread
twelite.start()
print("Started receiving")
while True:
twelite.join(0.5)
この仕組みを利用して、メインループから TWELITE DIP の DO1
ピンを制御し、0.5秒おきに LED を点滅させてみましょう。
先のスクリプトを、次のように改変してください。これまでの変更も適用した全体を示します。
# -*- coding:utf-8 -*-
from zoneinfo import ZoneInfo
from typing import Any
import mwings as mw
# Main function
def main() -> None:
# Create a twelite object
twelite = mw.Twelite(mw.utils.ask_user_for_port())
# Use JST for received data
twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
# Register event handlers
@twelite.on(mw.common.PacketType.APP_TWELITE)
def on_app_twelite(packet: mw.parsers.app_twelite.ParsedPacket) -> None:
print(f"\rDO1 LED: {"🔴" if packet.di_state[0] else "⚪"}", end='', flush=True)
# Initialize command
initial: dict[str, Any] = {
"destination_logical_id": 0x78, # All child devices
"di_to_change": [True, False, False, False], # Enable DI1
"di_state": [False, False, False, False], # Initial state of DIx
}
command = mw.serializers.app_twelite.Command(**initial)
# Toggle the DI1 state
def toggle_di1() -> None:
command.di_state[0] = not command.di_state[0]
twelite.send(command)
# Start receiving
try:
# Set as daemon thread
twelite.daemon = True
# Start the thread, Join to the main thread
twelite.start()
print("Started receiving")
while True:
toggle_di1() # Send
twelite.join(0.5) # Receive
except KeyboardInterrupt:
# Stop the thread
print("Flushing...")
twelite.stop()
print("Completed")
if __name__ == "__main__":
# Call the main function
main()
スクリプトの仕組み
はじめにserializers.app_twelite.command
の内容を初期化し、di_state[0]
を反転させて送信するクロージャ toggle_di1()
を定義しています。メインループから toggle_di1()
を呼び出すことで、LED を点滅させます。
このスクリプトを実行すると、先ほどと同様に仮想的なLEDを表示します。
表示例
その一方で、TWELITE DIP の DO1
ピンに接続した LED は点滅し続けるはずです。スクリプトを終了すると、点滅は止まります。
Python を使って子機のデータをウェブサーバへ送信する
Python を使って子機と通信するためのグラフィカルUIを備えたアプリケーションを構築する