Python、Redis 驅動(redis-py)和 Raspberry Pi GPIO 庫(RPi.GPIO)的完整範例流程,用於建立一個背景執行緒來定時讀取 GPIO 狀態並更新 Redis 鍵值。
⚙️ 流程步驟與程式碼範例
步驟 1: 安裝必要的 Python 函式庫
您需要安裝 Redis 客戶端函式庫 (redis) 和用於控制 GPIO 的函式庫 (RPi.GPIO)。
Bash
# 確保 pip 已經安裝
sudo apt install python3-pip -y
# 安裝 redis-py 和 RPi.GPIO
pip3 install redis RPi.GPIO
步驟 2: 建立 Python 程式碼 (gpio_redis_buffer.py)
請將以下程式碼儲存為 gpio_redis_buffer.py。此程式碼定義了一個獨立的執行緒,負責週期性地讀取 GPIO 腳位的 HIGH/LOW 狀態,並將其值(1 或 0)寫入到 Redis。
Python
import redis
import RPi.GPIO as GPIO
import threading
import time
import sys
# --- 配置區 ---
# 1. Redis 配置
# 假設 Redis Server 運行在本機的預設埠 6379
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
# 2. GPIO 腳位與 Redis Key 映射
# 使用 BCM 編號模式,腳位號碼即為 GPIO 編號
# 鍵名應與您要求的 p17, p27, p22 一致
GPIO_CONFIG = {
17: "p17",
27: "p27",
22: "p22"
}
# 定時讀取間隔 (秒)
READ_INTERVAL = 1
# -----------------
# 初始化 Redis 連線
try:
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
# 測試連線是否成功
r.ping()
print("Redis 連線成功。")
except redis.exceptions.ConnectionError as e:
print(f"Redis 連線失敗: {e}")
sys.exit(1)
def gpio_reader_thread(r, gpio_pins):
"""
背景執行緒函式:定時讀取 GPIO 狀態並更新 Redis。
"""
print(f"背景執行緒已啟動,定時讀取間隔: {READ_INTERVAL} 秒")
# 初始化 GPIO
try:
# 使用 BCM 編號方式 (即 GPIOxx 號碼)
GPIO.setmode(GPIO.BCM)
# 將所有配置的腳位設定為輸入模式,並啟用下拉電阻
for pin in gpio_pins.keys():
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
print(f" -> 設置 GPIO {pin} (Key: {gpio_pins[pin]}) 為輸入模式。")
except Exception as e:
print(f"GPIO 初始化失敗: {e}")
# 如果 GPIO 初始化失敗,終止執行緒
return
while True:
try:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] 正在讀取 GPIO...")
for pin, key in gpio_pins.items():
# 讀取 GPIO 腳位狀態 (HIGH/LOW)
pin_state = GPIO.input(pin)
# 將狀態 (1 或 0) 存入 Redis
# 使用 r.set(key, value)
r.set(key, pin_state)
print(f" -> {key} (GPIO {pin}): {pin_state} (已存入 Redis)")
# 等待下一次讀取
time.sleep(READ_INTERVAL)
except KeyboardInterrupt:
# 捕獲執行緒內的中斷訊號
print("\n背景執行緒收到中斷,準備退出...")
break
except Exception as e:
# 捕獲其他錯誤,並在短暫等待後繼續
print(f"背景執行緒發生錯誤: {e}. 1 秒後重試...")
time.sleep(1)
# 執行緒結束前,清理 GPIO
GPIO.cleanup()
print("GPIO 資源已清理。背景執行緒結束。")
if __name__ == "__main__":
# 建立並啟動背景執行緒
reader_thread = threading.Thread(
target=gpio_reader_thread,
args=(r, GPIO_CONFIG),
daemon=True # 設置為守護執行緒,主程式退出時會自動結束
)
reader_thread.start()
print("\n--- 主程式啟動 ---")
print("背景執行緒已在運行,請按 Ctrl+C 停止程式並清理 GPIO 資源。")
print("------------------\n")
# 主執行緒進入無限循環,保持程式運行
try:
while True:
time.sleep(0.5)
# 可以加入其他主程式邏輯
pass
except KeyboardInterrupt:
print("\n主程式收到 Ctrl+C,正在等待執行緒結束...")
finally:
# 即使主程式退出,因為 reader_thread 是 daemon=True,它也會很快結束
# 如果不是 daemon 執行緒,這裡需要加上 reader_thread.join()
GPIO.cleanup()
print("主程式退出。")
sys.exit(0)
步驟 3: 執行與驗證
3.1 執行程式
在終端機中運行您建立的 Python 檔案:
Bash
python3 gpio_redis_buffer.py
您將看到程式開始定時輸出讀取狀態。
3.2 驗證 Redis 鍵值
在另一個終端機視窗中,使用 redis-cli 連接到 Redis Server,並使用 GET 或 MGET 命令來驗證鍵值是否正在更新。
連接到 Redis:
Bashredis-cli讀取鍵值:
BashMGET p17 p27 p22✅ 預期輸出: 您將看到類似
["0", "0", "0"]的輸出,其中0或1反映了對應 GPIO 腳位的即時狀態。如果您將 GPIO 腳位(例如 GPIO 17)連接到 HIGH(例如 3.3V),並再次執行
MGET,您會看到p17的值立即更新為"1"。退出
redis-cli:BashQUIT
3.3 停止程式
回到運行 gpio_redis_buffer.py 的終端機,按下 Ctrl+C,程式將會安全退出,並自動執行 GPIO.cleanup() 清理腳位資源。
沒有留言:
張貼留言