2025年12月9日 星期二

pi3 GPIO key scan !

 Python、Redis 驅動(redis-py)和 Raspberry Pi GPIO 庫(RPi.GPIO)的完整範例流程,用於建立一個背景執行緒來定時讀取 GPIO 狀態並更新 Redis 鍵值。

⚙️ 流程步驟與程式碼範例

步驟 1: 安裝必要的 Python 函式庫

您需要安裝 Redis 客戶端函式庫 (redis) 和用於控制 GPIO 的函式庫 (RPi.GPIO)。

Bash
# 確保 pip 已經安裝
sudo apt install python3-pip -y

# 安裝 redis-py 和 RPi.GPIO
pip3 install redis RPi.GPIO

步驟 2: 建立 Python 程式碼 (gpio_redis_buffer.py)

請將以下程式碼儲存為 gpio_redis_buffer.py。此程式碼定義了一個獨立的執行緒,負責週期性地讀取 GPIO 腳位的 HIGH/LOW 狀態,並將其值(10)寫入到 Redis。

Python
import redis
import RPi.GPIO as GPIO
import threading
import time
import sys

# --- 配置區 ---

# 1. Redis 配置
# 假設 Redis Server 運行在本機的預設埠 6379
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379

# 2. GPIO 腳位與 Redis Key 映射
# 使用 BCM 編號模式,腳位號碼即為 GPIO 編號
# 鍵名應與您要求的 p17, p27, p22 一致
GPIO_CONFIG = {
    17: "p17",
    27: "p27",
    22: "p22"
}
# 定時讀取間隔 (秒)
READ_INTERVAL = 1 

# -----------------

# 初始化 Redis 連線
try:
    r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
    # 測試連線是否成功
    r.ping()
    print("Redis 連線成功。")
except redis.exceptions.ConnectionError as e:
    print(f"Redis 連線失敗: {e}")
    sys.exit(1)


def gpio_reader_thread(r, gpio_pins):
    """
    背景執行緒函式:定時讀取 GPIO 狀態並更新 Redis。
    """
    print(f"背景執行緒已啟動,定時讀取間隔: {READ_INTERVAL} 秒")
    
    # 初始化 GPIO
    try:
        # 使用 BCM 編號方式 (即 GPIOxx 號碼)
        GPIO.setmode(GPIO.BCM) 
        
        # 將所有配置的腳位設定為輸入模式,並啟用下拉電阻
        for pin in gpio_pins.keys():
            GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
            print(f"  -> 設置 GPIO {pin} (Key: {gpio_pins[pin]}) 為輸入模式。")

    except Exception as e:
        print(f"GPIO 初始化失敗: {e}")
        # 如果 GPIO 初始化失敗,終止執行緒
        return 

    while True:
        try:
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{timestamp}] 正在讀取 GPIO...")

            for pin, key in gpio_pins.items():
                # 讀取 GPIO 腳位狀態 (HIGH/LOW)
                pin_state = GPIO.input(pin)
                
                # 將狀態 (1 或 0) 存入 Redis
                # 使用 r.set(key, value)
                r.set(key, pin_state)
                
                print(f"  -> {key} (GPIO {pin}): {pin_state} (已存入 Redis)")
            
            # 等待下一次讀取
            time.sleep(READ_INTERVAL)

        except KeyboardInterrupt:
            # 捕獲執行緒內的中斷訊號
            print("\n背景執行緒收到中斷,準備退出...")
            break
        except Exception as e:
            # 捕獲其他錯誤,並在短暫等待後繼續
            print(f"背景執行緒發生錯誤: {e}. 1 秒後重試...")
            time.sleep(1)

    # 執行緒結束前,清理 GPIO
    GPIO.cleanup()
    print("GPIO 資源已清理。背景執行緒結束。")


if __name__ == "__main__":
    
    # 建立並啟動背景執行緒
    reader_thread = threading.Thread(
        target=gpio_reader_thread, 
        args=(r, GPIO_CONFIG),
        daemon=True # 設置為守護執行緒,主程式退出時會自動結束
    )
    reader_thread.start()
    
    print("\n--- 主程式啟動 ---")
    print("背景執行緒已在運行,請按 Ctrl+C 停止程式並清理 GPIO 資源。")
    print("------------------\n")

    # 主執行緒進入無限循環,保持程式運行
    try:
        while True:
            time.sleep(0.5)
            # 可以加入其他主程式邏輯
            pass
    except KeyboardInterrupt:
        print("\n主程式收到 Ctrl+C,正在等待執行緒結束...")
    finally:
        # 即使主程式退出,因為 reader_thread 是 daemon=True,它也會很快結束
        # 如果不是 daemon 執行緒,這裡需要加上 reader_thread.join()
        GPIO.cleanup()
        print("主程式退出。")
        sys.exit(0)

步驟 3: 執行與驗證

3.1 執行程式

在終端機中運行您建立的 Python 檔案:

Bash
python3 gpio_redis_buffer.py

您將看到程式開始定時輸出讀取狀態。

3.2 驗證 Redis 鍵值

另一個終端機視窗中,使用 redis-cli 連接到 Redis Server,並使用 GETMGET 命令來驗證鍵值是否正在更新。

  1. 連接到 Redis:

    Bash
    redis-cli
    
  2. 讀取鍵值:

    Bash
    MGET p17 p27 p22
    

    ✅ 預期輸出: 您將看到類似 ["0", "0", "0"] 的輸出,其中 01 反映了對應 GPIO 腳位的即時狀態。

    如果您將 GPIO 腳位(例如 GPIO 17)連接到 HIGH(例如 3.3V),並再次執行 MGET,您會看到 p17 的值立即更新為 "1"

  3. 退出 redis-cli

    Bash
    QUIT
    

3.3 停止程式

回到運行 gpio_redis_buffer.py 的終端機,按下 Ctrl+C,程式將會安全退出,並自動執行 GPIO.cleanup() 清理腳位資源。

沒有留言:

張貼留言