セクションの複数ページをまとめています。 印刷またはPDF形式で保存...

もとのページに戻る

2025-08-01 現在

Python による子機との通信(ウェブサーバIoT編)

Python を使って子機のデータをウェブサーバへ送信する
    実用に近い応用編として、ウェブサーバを利用した IoT システムを構築します。

    TWELITE ARIA の温湿度データを収集

    温湿度センサータグ TWELITE ARIA から受信した温湿度データをウェブサーバへ送信し、ページ上のグラフに表示する簡単な IoT システムを構築してみましょう。

    ThingSpeakの表示例

    ThingSpeakの表示例

    TWELITE DIP を対象とした基本編のスクリプトでは DI1DO1 の簡単な操作に留めましたが、実際の IoT システムでは、REST API などを利用して、取得したデータをさらに上位のサーバへ送信する必要があります。

    ThingSpeak について

    ここでは、MathWorks のサービス ThingSpeakMWings ライブラリを組み合わせて使います。

    アカウントの作成

    ThingSpeak のサイトへアクセスし、MathWorks のアカウントを作成します。

    Channel の作成

    “Channel” を作成し、次のように設定します(“Name"や"Description"は適当で構いません)

    Channelの設定例

    Channelの設定例

    API Keys の取得

    Channel ページの “API Keys” タブに移動し、16文字の “Write API key” を控えておきます。

    TWELITE ARIA の設定と起動

    TWELITE ARIA の設定変更

    TWELITE ARIA の設定を変更し、TWELITE ARIA モード送信間隔を20秒以上とします(サーバへの過負荷を避けるため)。

    TWELITE ARIA の起動

    CR2032 電池を投入し、TWELITE ARIA を起動します。

    CR2032電池を挿入

    CR2032電池を挿入

    スクリプトの作成と実行

    モジュールの導入

    Python 3.12 以降と mwings(またはmwingslite)およびrequestsモジュールを導入します。

    
    
    pip install mwings requests

    スクリプトの作成

    後述のスクリプト stick_aria_thingspeak.py を作成してください。mwings.parsers.app_ariaを使ってデータを受信し、requestsモジュールを使って ThingSpeak へ HTTP GET リクエストを送信します。

    # -*- coding:utf-8 -*-
    
    from zoneinfo import ZoneInfo
    from time import perf_counter
    
    import mwings as mw
    import requests
    
    
    API_KEY = "XXXXXXXXXXXXXXXX"  # Replace with your ThingSpeak API key
    BASE_URL = "https://api.thingspeak.com/update"
    SEND_MIN_INTERVAL = 20  # Minimum interval in seconds to send data to ThingSpeak
    
    
    # Main function
    def main() -> None:
        # Create a twelite object
        twelite = mw.Twelite(mw.utils.ask_user_for_port())
    
        # Use JST for received data
        twelite.set_timezone(ZoneInfo("Asia/Tokyo"))
    
        # Initialize last send time
        last_send_time = perf_counter() - SEND_MIN_INTERVAL
    
        # Initialize the target serial ID
        target_serial_id = -1
    
        # Register event handlers
        @twelite.on(mw.common.PacketType.APP_ARIA)
        def on_app_aria(packet: mw.parsers.app_aria.ParsedPacket) -> None:
    
            # Filter packets by serial ID
            if target_serial_id < 0:
                # Set the serial ID from the received packet
                target_serial_id = packet.source_serial_id
                print(f"Serial ID set to {target_serial_id:08X}")
            elif packet.source_serial_id != target_serial_id:
                # Ignore packets from other serial ID devices
                print(
                    f"Ignoring packet from serial ID {packet.source_serial_id:08X}, expected {target_serial_id:08X}"
                )
                return
    
            # Throttle sending to ThingSpeak
            if perf_counter() - last_send_time < SEND_MIN_INTERVAL:
                print("Skipping send due to minimum interval")
                return  # Skip sending if within the minimum interval
            last_send_time = perf_counter()  # Update last send time
    
            # Send data to ThingSpeak
            payload = {
                "api_key": API_KEY,
                "field1": f"{packet.temp_100x / 100.0:.2f}",  # Temperature
                "field2": f"{packet.humid_100x / 100.0:.2f}",  # Humidity
                "field3": f"{packet.supply_voltage}",  # Supply voltage (mV)
                "field4": f"{packet.lqi}",  # Link Quality Indicator
            }
            response = requests.get(BASE_URL, params=payload)
    
            # Check the response status
            if response.status_code == 200:
                print(f"OK: entry ID = {response.text}")
            else:
                print(f"NG: status code = {response.status_code}")
    
        # Start receiving
        try:
            # Set as daemon thread
            twelite.daemon = True
            # Start the thread, Join to the main thread
            twelite.start()
            print("Started receiving")
            while True:
                twelite.join(0.5)  # Receive
        except KeyboardInterrupt:
            # Stop the thread
            print("Flushing...")
            twelite.stop()
            print("Completed")
    
    
    if __name__ == "__main__":
        # Call the main function
        main()

    スクリプトの実行

    スクリプトを実行します。

    
    
    python stick_aria_thingspeak.py

    TWELITE ARIA からのデータを受信し、正常にデータを送信できたなら、次のようにして値の続き番号を示す Entry ID が表示されます。

    Started receiving
    Serial ID set to 8201C2DC
    OK: entry ID = 1
    OK: entry ID = 2
    OK: entry ID = 3
    OK: entry ID = 4
    OK: entry ID = 5
    ...

    ThingSpeak の “Private View” タブをクリックしてください。送信されたデータがグラフに表示されるはずです。

    初期状態では、Y軸の範囲が動的に変化します。各グラフの✏️ボタンから最大最小値などを設定できます。

    約2日分のデータを表示する様子

    約2日分のデータを表示する様子

    スクリプトの解説

    サーバへ送信するデータは、次の部分で構成しています。

    # Send data to ThingSpeak
    payload = {
        "api_key": API_KEY,
        "field1": f"{packet.temp_100x / 100.0:.2f}",  # Temperature
        "field2": f"{packet.humid_100x / 100.0:.2f}",  # Humidity
        "field3": f"{packet.supply_voltage}",  # Supply voltage (mV)
        "field4": f"{packet.lqi}",  # Link Quality Indicator
    }
    response = requests.get(BASE_URL, params=payload)

    mwings.parsers.app_aria から温湿度とコイン電池の電源電圧、0-255の数値で表現される電波通信品質(LQI)を取得し、文字列に変換したうえで GET リクエストのクエリとしています。