2025年9月11日 星期四

twowheel_amr 機器人模擬環境 node and topic

  以下是針對 twowheel_amr 機器人模擬環境中的節點和主題的說明:

節點 (Nodes)

這些節點代表模擬環境中的各個軟體元件,負責特定的功能。

  • /gazebo: 這是 Gazebo 模擬器的主要節點,負責執行物理引擎、渲染和處理所有模擬事件。

  • /gazebo_ros_ray_sensor: 這是模擬世界中一個特定於 twowheel_amr 的雷射掃描感測器節點。通常用於 Gazebo 世界中,以模擬雷射掃描儀的行為。

  • /robot_state_publisher: 這個節點會讀取機器人的 URDF 檔案,並結合 /joint_states 主題發布的關節資訊,計算並發布每個機器人連桿的 3D 姿態。

  • /rqt_gui_py_node_10542: 這是 ROS 2 的圖形化工具 rqt 的實例。它提供了一個圖形介面來監控和除錯 ROS 2 系統。

  • /twowheel_amr/diff_drive_controller: 這個是 twowheel_amr 機器人的差速驅動控制器節點。它訂閱 /twowheel_amr/cmd_vel 主題的指令,並根據這些指令計算每個車輪的轉速,以驅動機器人在 Gazebo 中移動。

  • /twowheel_amr/gazebo_ros_imu_sensor: 這是 twowheel_amr 機器人的 IMU(慣性測量單元)感測器節點。它模擬 IMU 的行為並將數據發布到 /twowheel_amr/imu/data 主題。

  • /twowheel_amr/gazebo_ros_ray_sensor: 這是 twowheel_amr 機器人的雷射掃描感測器節點。它模擬雷射掃描儀的行為並將數據發布到 /twowheel_amr/scan 主題。


主題 (Topics)

這些主題是節點之間交換數據的管道。

  • /clock: 在模擬環境中,這個主題發布模擬時間,取代了系統時鐘。

  • /joint_states: 這個主題發布機器人所有關節的當前位置、速度和力矩,通常由 Gazebo 模擬器發布。

  • /parameter_events: ROS 2 內部主題,用於廣播節點參數的變化。

  • /performance_metrics: 內部主題,提供關於 ROS 2 系統效能的數據。

  • /robot_description: 包含機器人完整的 URDF 或 SDF 描述,供 robot_state_publisher 等節點使用。

  • /rosout: ROS 2 的標準日誌主題,所有節點的日誌資訊都會發布到這裡。

  • /scan: 這是通用雷射掃描主題,可能與/gazebo_ros_ray_sensor節點相關,但由於有/twowheel_amr/scan主題,這個主題可能已經不再活躍使用。

  • /tf 與 /tf_static: 這些主題發布動態和靜態的座標轉換資訊,用於描述機器人連桿、感測器和世界坐標系之間的關係。

  • /twowheel_amr/cmd_vel: 這是指令速度主題,用於遠程控制 twowheel_amr 機器人。當您發布訊息到這個主題時,diff_drive_controller 會讀取並據此移動機器人。

  • /twowheel_amr/imu/datatwowheel_amr 機器人 IMU 感測器發布的數據,通常包含加速度和角速度資訊。

  • /twowheel_amr/scantwowheel_amr 機器人雷射掃描感測器發布的數據,通常用於建圖或避障。

工作流從 gazebo_ros 轉換到 ros_gz_sim

 

ROS 2 啟動 Gazebo 之後,您可以使用 ros2 run 指令,透過 gazebo_ros 套件中的 spawn_entity.py 工具來載入 URDF 檔案。


載入 URDF 到 Gazebo 的指令

要將名為 twowheel.urdf 的 URDF 檔案載入到 Gazebo Sim 中,標準的 ROS 2 指令格式如下:

ros2 run gazebo_ros spawn_entity.py -entity twowheel_amr -file /path/to/your/twowheel.urdf -x 0 -y 0 -z 0

指令參數說明

  • ros2 run gazebo_ros spawn_entity.py: 這是用來執行 gazebo_ros 套件中,負責產生實體的 Python 腳本。

  • -entity twowheel_amr: 指定您要產生的實體名稱。這在 Gazebo 內部會被用作模型的名稱。

  • -file /path/to/your/twowheel.urdf: 指定您 URDF 檔案的完整路徑。 非常重要:您需要將 /path/to/your/ 替換為 twowheel.urdf 檔案在您系統中的實際路徑。

  • -x 0 -y 0 -z 0: 這是可選參數,用來設定模型在 Gazebo 世界中的初始位置。您可以根據需要調整這些值。

備註

  • 環境變數: 為了確保 ros2 指令能找到 gazebo_ros 和其他 ROS 2 套件,請確認您已在終端機中 source 過您的工作空間設定檔 (setup.bash)。

  • Gazebo Sim 運行中: 這個 spawn_entity.py 指令需要 Gazebo Sim 已經在運行中才能成功執行。您通常會先使用一個 launch 檔案啟動 Gazebo,然後在另一個終端機中執行此 spawn 指令。

  • URDF 檔案: 確保您的 twowheel.urdf 檔案內容有效且沒有錯誤。如果 URDF 本身有問題,Gazebo 可能會無法正確載入模型並給出錯誤訊息。




gazebo_ros 是針對 Gazebo 9 (舊版) 和 Gazebo Classic 設計的

而 ros_gz_sim 則是針對新的 Gazebo (GZ) Sim 所設計的

且是 ROS 2 官方推薦的整合方式。

這兩者是不同的套件,提供了不同 Gazebo 版本的 ROS 2 橋接功能。


為什麼推薦使用 ros_gz_sim

  • 官方支持ros_gz_sim 是 Gazebo 專案的官方橋接工具,旨在提供 Gazebo Sim 與 ROS 2 之間的無縫整合。

  • 新功能與效能: Gazebo Sim (或稱 GZ Sim) 提供了許多相較於舊版 Gazebo 的新功能和效能改進,例如更現代的物理引擎和渲染技術。

  • 跨平台相容性ros_gz_sim 提供了 ROS 2 和 Gazebo Sim 之間的通用橋接,可用於任何 ROS 2 支援的平台。

如何將您的工作流從 gazebo_ros 轉換到 ros_gz_sim

您需要將原先使用 gazebo_ros 的命令和啟動檔案替換成 ros_gz_sim 對應的工具。

1. 安裝 ros_gz_sim 套件

您需要先安裝 ros_gz_sim 橋接套件,確保其版本與您的 ROS 2 和 Gazebo Sim 版本相容。

Bash
sudo apt-get install ros-<distro>-ros-gz-sim

將 <distro> 替換為您的 ROS 2 版本,例如 humble 或 iron

2. 啟動 Gazebo Sim

您可以使用 ros_gz_sim 提供的啟動檔案來開啟 Gazebo Sim。

Bash
ros2 launch ros_gz_sim_demos gz_sim.launch.py

這會開啟一個空的 Gazebo 世界。您也可以指定一個世界檔案來啟動 Gazebo:

Bash
ros2 launch ros_gz_sim_demos gz_sim.launch.py gz_args:="-r /path/to/your/world.sdf"

3. 載入模型

ros_gz_sim 使用 ros_gz_sim 套件中的 create 節點來產生實體,而不是舊版的 spawn_entity.py。您通常會在啟動檔案中完成此步驟。

例如,在您的 .launch.py 檔案中加入以下內容:

Python
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    pkg_ros_gz_sim = get_package_share_directory('ros_gz_sim')
    
    # 載入 URDF/SDF 模型
    urdf_path = os.path.join(
        get_package_share_directory('your_robot_description_package'),
        'urdf',
        'twowheel.urdf'
    )
    
    spawn_entity_node = Node(
        package='ros_gz_sim',
        executable='create',
        output='screen',
        arguments=[
            '-file', urdf_path,
            '-name', 'twowheel_amr',
            '-x', '0',
            '-y', '0',
            '-z', '0.5'
        ]
    )
    
    return LaunchDescription([
        spawn_entity_node
    ])

總結:從 gazebo_ros 轉移到 ros_gz_sim 是正確且符合未來的方向。它需要您更改啟動指令和啟動檔案的內容,但能讓您使用最新版本的 Gazebo 和其提供的所有功能。





如何使用 ros_gz_sim 替代 gazebo_ros?

 

如何使用 ros_gz_sim 替代 gazebo_ros

要將 ros2 run gazebo_ros spawn_entity.py 轉換為 ros_gz_sim 的版本,

主要涉及將命令列執行方式改為啟動檔案 (Launch File) 的模式。

ros_gz_sim 提供了 create 節點來執行生成實體的功能,

但它通常作為 LaunchDescription 的一部分來使用。


以下是將您的 spawn_entity.py 命令轉換為 ros_gz_sim 啟動檔案的步驟。


步驟 1:建立啟動檔案

在您的 ROS 2 套件中,建立一個 Python 啟動檔案(例如 spawn_robot.launch.py)。這個檔案會定義如何啟動 Gazebo Sim 並生成您的機器人。

Python
# 文件名: spawn_robot.launch.py

import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    # 這裡假設您的 urdf 檔案在名為 'your_robot_description' 的套件中
    # 您需要將 'your_robot_description' 替換為實際的套件名稱
    urdf_path = os.path.join(
        get_package_share_directory('your_robot_description'),
        'urdf',
        'twowheel.urdf'
    )
    
    # 啟動 Gazebo Sim 的命令
    # 這部分可以用 ros_gz_sim 提供的啟動檔案來替代,例如 ros_gz_sim_demos 中的 gz_sim.launch.py
    # 但這裡我們專注於生成實體的指令
    
    # 使用 ros_gz_sim 套件中的 'create' 節點來生成實體
    spawn_entity_node = Node(
        package='ros_gz_sim',
        executable='create',
        output='screen',
        arguments=[
            '-file', urdf_path,
            '-name', 'twowheel_amr',
            '-x', '0',
            '-y', '0',
            '-z', '0.5' # 建議從地面以上生成
        ]
    )
    
    return LaunchDescription([
        spawn_entity_node
    ])

步驟 2:執行啟動檔案

將 twowheel.urdf 放置在您的 ROS 2 套件的 urdf/ 目錄中,並將上述 spawn_robot.launch.py 檔案放置在 launch/ 目錄中。

然後,在終端機中,首先確保您的工作空間已載入:

Bash
cd ~/data/ros2_example_ws
source install/setup.bash

接下來,您可以執行您的啟動檔案來生成機器人:

Bash
ros2 launch your_robot_description spawn_robot.launch.py

請注意:此指令需要 Gazebo Sim 已經在運行中,才能成功將模型生成到模擬世界中。您可以在另一個終端機中,手動或透過其他啟動檔案來啟動 Gazebo Sim。


為什麼要這樣做?

  • 啟動檔案的優勢:在 ROS 2 中,使用啟動檔案是管理複雜節點和參數的標準方法。它提供了一種更結構化、更可重複的方式來啟動多個節點,而不是單獨執行命令。

  • ros_gz_sim 橋接ros_gz_sim 透過其 create 節點提供了與 Gazebo Sim 的無縫整合。這個節點會監聽 ROS 2 的服務呼叫,並將 URDF/SDF 檔案轉換成 Gazebo Sim 可以理解的實體。



ros2 node & topic

 ros2 node list

ros2 topic list 


從您提供的 ROS 2 輸出結果來看,您的系統正在運行一些標準的機器人 ROS 2 節點和相關的主題。


節點 (Nodes)

節點是 ROS 2 中最小的可執行單元,負責執行特定的任務。

  • /robot_state_publisher: 這個節點非常重要,它會讀取機器人的 URDF(統一機器人描述格式)或 SDF 檔案,並監聽來自 /joint_states 主題的關節狀態訊息。然後,它會計算所有機器人連桿的 3D 姿勢,並將這些資訊以轉換(TF)的形式發布到 /tf 和 /tf_static 主題上。這使得其他節點(例如 Rviz)能夠知道機器人每個部分的準確位置和方向。

  • /rqt_gui_py_node_10542: 這是一個 RQT(ROS 2 Qt)圖形介面工具的節點。rqt 是一個基於插件的框架,提供各種圖形化工具來除錯和互動。這個節點的名稱結尾包含一個數字,通常是為了確保在同一時間運行多個實例時,每個節點都有唯一的名稱。


主題 (Topics)

主題是 ROS 2 中用於節點之間傳輸訊息的命名頻道。

  • /joint_states: 這個主題發布機器人各個關節的當前狀態,例如位置、速度和力矩。通常,這個訊息由模擬器(例如 Gazebo)或真實機器人上的感測器發布。

  • /parameter_events: 這是 ROS 2 系統使用的內部主題,用於廣播節點參數變化的事件。當一個節點的參數被改變時,會發布一個事件訊息,其他節點可以訂閱此主題來追蹤這些變化。

  • /robot_description: 這個主題通常由一個節點發布,包含了機器人完整的 URDF 或 SDF 描述。robot_state_publisher 和 Rviz 等節點會訂閱這個主題來獲取機器人的結構和幾何資訊。

  • /rosout: 這是 ROS 2 的標準日誌主題。所有節點發出的日誌訊息(例如 INFOWARNERROR)都會發布到這個主題,通常由 ros2 run rosout rosout 節點處理,並顯示在終端機上。

  • /tf: 這個主題用於發布動態的轉換(Transformations)資訊,通常包含機器人連桿、感測器或世界座標系之間的即時位置和方向關係。robot_state_publisher 節點就是這個主題的主要發布者。

  • /tf_static: 這個主題與 /tf 類似,但它用於發布靜態的轉換資訊,這些資訊在機器人運作期間不會改變。例如,相機與其支架之間的固定轉換關係,通常會在這個主題上只發布一次。這樣可以避免重複發布不變的資訊,從而提高效率。

ROS 2 cmd_vel 指令從 gazebo_ros 轉換到 ros_gz_sim

  將您的 ROS 2 cmd_vel 指令從 gazebo_ros 轉換到 ros_gz_sim

主要是要將 topic 名稱從 /twowheel_amr/cmd_vel 調整為 ros_gz_sim 預設的橋接格式。



問題分析

gazebo_ros 和 ros_gz_sim 兩者之間最大的不同在於 ROS Topic 和 Gazebo Topic 之間的命名約定

  • gazebo_ros: 舊版的 Gazebo 橋接器通常會為每個機器人模型創建一個獨立的命名空間,例如 /twowheel_amr。因此,cmd_vel topic 的完整名稱會是 /twowheel_amr/cmd_vel

  • ros_gz_sim: 新版 ros_gz_sim 的橋接器,預設的 Topic 命名格式為 /model/<model_name>/cmd_vel。這提供了一種更標準且可預測的命名模式,方便跨不同模型進行控制。


解決方案

根據 ros_gz_sim 的命名慣例,您需要修改 ROS Topic 的名稱。

將原來的指令:

Bash
ros2 topic pub -r 5 /twowheel_amr/cmd_vel geometry_msgs/Twist "{linear: {x: 0.2}, angular: {z: 0.0}}"

更改為:

Bash
ros2 topic pub -r 5 /model/twowheel_amr/cmd_vel geometry_msgs/msg/Twist "{linear: {x: 0.2}, angular: {z: 0.0}}"

額外注意事項

  1. 確保橋接正常運作: 在 gz_sim 中,您需要確保 ros_gz_bridge 橋接節點已經運行,並正確設定了從 ROS 2 /model/twowheel_amr/cmd_vel Topic 到 Gazebo 的 cmd_vel Topic 的轉發。

  2. Twist 訊息類型: 請注意,ros_gz_bridge 的 ROS 2 訊息類型是 geometry_msgs/msg/Twist,與舊版指令中的 geometry_msgs/Twist 略有不同。在 ROS 2 中,通常會加上 /msg 子目錄以明確指定訊息類型。

  3. 模型與驅動: 您的 Gazebo 模型 (twowheel_amr) 必須包含能夠接收 cmd_vel 命令並驅動車輪的 Gazebo 插件(例如 libignition-gazebo-diff-drive-system)。如果沒有這個插件,即使您發布了正確的 Topic,模型也不會移動。


2025年9月10日 星期三

Gzserver 和 Gzclient 確實可以在不同 IP 的主機上運行

 Gzserver 和 Gzclient 確實可以在不同 IP 的主機上運行,這是 Gazebo 模擬器設計的核心優勢之一,允許您將複雜的物理模擬(在 Gzserver 上)與圖形化視覺化(在 Gzclient 上)分開。這對於遠端控制、高效能計算或多使用者協作非常有用。

如何建立 Gzserver 與 Gzclient 的遠端連線

要讓 Gzserver 和 Gzclient 在不同 IP 的主機上成功連線,您需要確保以下條件和步驟:

步驟一:設定環境變數

這是最關鍵的步驟。Gzserver 和 Gzclient 預設會尋找一個名為 GAZEBO_MASTER_URI 的環境變數來建立連線。這個變數會告訴客戶端伺服器的位置。

  • 在運行 Gzserver 的主機上:

    Gzserver 在啟動時會綁定一個主機位址和埠號。您可以透過 export 設定 GAZEBO_MASTER_URI,也可以讓它使用預設值。預設的埠號是 11345

    # 這行指令是選擇性,如果未指定,Gazebo會自動使用預設的 'localhost:11345'
    export GAZEBO_MASTER_URI=http://<伺服器IP位址>:11345
    

    然後,啟動伺服器:

    gzserver --verbose empty.world
    
  • 在運行 Gzclient 的主機上:

    這是最重要的一步。您必須將客戶端的 GAZEBO_MASTER_URI 設定為伺服器的 IP 位址。

    # 這裡的 <伺服器IP位址> 是運行 gzserver 的主機的實際 IP
    export GAZEBO_MASTER_URI=http://<伺服器IP位址>:11345
    

    接著,啟動客戶端:

    gzclient
    

如果一切設定正確,Gzclient 將會連線到遠端伺服器,並開始顯示模擬畫面。

步驟二:確保網路連線暢通

  • 防火牆設定:請確保伺服器主機的防火牆允許來自客戶端主機的連線,特別是 Gazebo 使用的 11345 埠號。如果需要,您可能需要新增一條防火牆規則。

  • IP 位址:確保客戶端填寫的 IP 位址是伺服器主機在網路中的可達位址。如果兩台主機在同一個區域網路,通常是 192.168.x.x10.0.x.x 這樣的位址。


ROS2 與 Gazebo Sim (新版) 的連線

對於 ROS2 和 Gazebo Sim(例如 Fortress 或 Garden),連線機制更加簡化。它依賴於 ROS2 的 DDS (Data Distribution Service) 通訊協定。

ROS2 的節點(包括 Gazebo Sim 的 ROS 插件)使用 DDS 進行發現和通訊。只要兩台電腦在同一個網路中,且 ROS2 的環境變數(例如 ROS_DOMAIN_ID)設定一致,它們就能自動找到彼此並建立連線。

因此,在 ROS2 環境中,您通常只需要確保:

  1. ROS2 環境變數一致:在兩台主機上執行 source 指令來載入相同的 ROS2 版本,並確保 ROS_DOMAIN_ID 環境變數相同。

  2. 啟動 Gazebo Sim

    • 在伺服器主機上啟動 Gazebo Sim,通常是透過 ROS2 啟動檔案:

      Bash
      ros2 launch my_robot_pkg my_robot.launch.py
      
    • 在客戶端主機上,您只需要啟動一個 ROS 節點來處理資料或發送命令,例如啟動 RVIZ2 來視覺化:

      Bash
      ros2 run rviz2 rviz2
      

總之,Gzserver 和 Gzclient 可以在不同主機上連線。在 Gazebo Classic 中,這需要手動設定 GAZEBO_MASTER_URI;而在 ROS2 的新版 Gazebo Sim 中,它則透過更先進的 DDS 機制自動完成。



2025年9月8日 星期一

shipcap_skeleton_paddleqr.zip

 已完成打包與擴充。下載:shipcap_skeleton_paddleqr.zip。內容含:

  • PaddleOCR worker(apps/ocr_worker/worker_paddleocr.py)。

  • Tesseract 版 worker。

  • USB HID 鍵盤模式掃碼(listener_hid.py,evdev)。

  • USB Serial 掃碼(listener_serial.py,pyserial)。

  • FastAPI 後端、Web 最小頁、Capture、systemd 範例與需求檔。對應規格之 FR/NFR、流程與 API。

最小測試:

# 啟動後端 cd shipcap3/apps/ingest_api && uvicorn main:app --host 0.0.0.0 --port 8000 # 建單與拍照(另開視窗) python shipcap3/apps/capture/capture.py SHP0001 image1 python shipcap3/apps/capture/capture.py SHP0001 image2 # 跑 PaddleOCR python shipcap3/apps/ocr_worker/worker_paddleocr.py SHP0001 /mnt/data/shipcap3/data/shipments/$(date +%Y)/$(date +%m)/$(date +%d)/SHP0001_image1_*.jpg # 掃碼(HID 鍵盤模式,指定裝置與單號) sudo -E python shipcap3/apps/qrcode_listener/listener_hid.py /dev/input/eventX SHP0001 # 或 Serial python shipcap3/apps/qrcode_listener/listener_serial.py /dev/ttyACM0 115200 SHP0001

需要我再加 Dockerfile、udev 規則與 docker-compose 一鍵啟動。 

出軟體建制的架構 跟 範例

 結論:採「多服務(同機)」架構,對應你給的 FR/NFR 與資料模型,

提供可直接啟動的專案骨架、API、DB schema、Docker Compose、systemd 與最小可用範例程式。

一、架構(對應規格)

  • 服務:capture(拍照1/2)、qrcode(USB HID/Serial)、ocringest-api(建檔/查詢/匯出)、web-ui(覆核/查詢)、sync(選配,多站同步)、backup

  • 資料層:SQLite 單站;要集中改用 PostgreSQL。影像按日期樹狀存,計算 SHA-256。

  • 目標流程:輸入出貨單號→拍照1→OCR→覆核→拍照2→掃 QR→建檔→可查詢/匯出。

  • 命名與路徑:/data/shipments/{YYYY}/{MM}/{DD}/SHP123_image1_20250115T103045Z.jpg;縮圖保存於 thumbs/

  • 權限與稽核:角色、登入、AuditLogs。

  • KPI:≥6–8 筆/分、OCR 成功門檻與人工覆核機制。

二、專案目錄(Mono-repo)

shipcap/ apps/ ingest_api/ # FastAPI (DB/檔案/匯出) web_ui/ # 覆核/查詢頁(FastAPI+Jinja2 或 Qt 選一) capture/ # OpenCV 拍照1/2 + 縮圖 + SHA256 ocr_worker/ # Tesseract/PaddleOCR + 版面模板 qrcode_listener/ # HID/Serial 讀碼並關聯當前作業 sync_agent/ # 選配:上傳中央 DB/NAS libs/ db/ # SQLAlchemy models, DAL core/ # 共用:config、paths、hash、schemas deploy/ docker-compose.yml systemd/ capture.service ocr.service ingest-api.service web-ui.service migrations/ tests/ .env.example config.yaml README.md

三、資料庫 Schema(SQLite / PostgreSQL)

-- Shipments CREATE TABLE IF NOT EXISTS shipments( id INTEGER PRIMARY KEY, ship_no TEXT UNIQUE NOT NULL, customer TEXT, ship_date DATE, operator TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Images CREATE TABLE IF NOT EXISTS images( id INTEGER PRIMARY KEY, shipment_id INTEGER NOT NULL REFERENCES shipments(id), kind TEXT CHECK(kind IN ('image1','image2')) NOT NULL, path TEXT NOT NULL, width INT, height INT, size_bytes INT, sha256 TEXT, taken_at TIMESTAMP ); -- QRCodes CREATE TABLE IF NOT EXISTS qrcodes( id INTEGER PRIMARY KEY, shipment_id INTEGER NOT NULL REFERENCES shipments(id), payload TEXT NOT NULL, symbology TEXT, raw_text TEXT, scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- OCR tables CREATE TABLE IF NOT EXISTS ocr_tables( id INTEGER PRIMARY KEY, shipment_id INTEGER NOT NULL REFERENCES shipments(id), json_data TEXT NOT NULL, version TEXT, ocr_engine TEXT, confidence_avg REAL, reviewed_by TEXT, reviewed_at TIMESTAMP ); -- AuditLogs CREATE TABLE IF NOT EXISTS audit_logs( id INTEGER PRIMARY KEY, user TEXT, action TEXT, subject TEXT, payload_json TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );

(欄位對應規格之資料模型與稽核)

四、API(FastAPI 最小集合)

# apps/ingest_api/main.py from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import StreamingResponse from pathlib import Path import hashlib, time, io, zipfile, sqlite3, json, os app = FastAPI(title="shipcap-ingest") DB = os.getenv("DB_PATH", "/data/shipcap.db") ROOT = Path(os.getenv("DATA_ROOT", "/data/shipments")) def db(): conn = sqlite3.connect(DB); conn.row_factory = sqlite3.Row return conn @app.post("/api/shipments") def create_shipment(ship_no: str = Form(...), customer: str = Form(""), ship_date: str = Form(""), operator: str = Form("")): with db() as conn: conn.execute("INSERT OR IGNORE INTO shipments(ship_no,customer,ship_date,operator) VALUES(?,?,?,?)", (ship_no, customer, ship_date, operator)) s = conn.execute("SELECT * FROM shipments WHERE ship_no=?", (ship_no,)).fetchone() return dict(s) @app.post("/api/shipments/{ship_no}/images") def upload_image(ship_no: str, kind: str = Form(...), file: UploadFile = File(...)): assert kind in ("image1","image2") ts = time.strftime("%Y%m%dT%H%M%SZ", time.gmtime()) subdir = ROOT / time.strftime("%Y/%m/%d") subdir.mkdir(parents=True, exist_ok=True) fn = f"{ship_no}_{kind}_{ts}.jpg" data = file.file.read() p = subdir / fn p.write_bytes(data) sha256 = hashlib.sha256(data).hexdigest() with db() as conn: s = conn.execute("SELECT id FROM shipments WHERE ship_no=?", (ship_no,)).fetchone() if not s: raise HTTPException(404, "shipment not found") conn.execute("""INSERT INTO images(shipment_id,kind,path,size_bytes,sha256,taken_at) VALUES(?,?,?,?,?,datetime('now'))""", (s["id"], kind, str(p), len(data), sha256)) return {"path": str(p), "sha256": sha256} @app.post("/api/shipments/{ship_no}/ocr") def post_ocr(ship_no: str, json_data: str = Form(...), engine: str = Form("tesseract"), conf: float = Form(0.0)): with db() as conn: s = conn.execute("SELECT id FROM shipments WHERE ship_no=?", (ship_no,)).fetchone() if not s: raise HTTPException(404, "shipment not found") conn.execute("""INSERT INTO ocr_tables(shipment_id,json_data,ocr_engine,confidence_avg) VALUES(?,?,?,?)""", (s["id"], json_data, engine, conf)) return {"ok": True} @app.post("/api/shipments/{ship_no}/qrcode") def add_qr(ship_no: str, payload: str = Form(...), symbology: str = Form("QR"), raw_text: str = Form("")): with db() as conn: s = conn.execute("SELECT id FROM shipments WHERE ship_no=?", (ship_no,)).fetchone() if not s: raise HTTPException(404, "shipment not found") conn.execute("""INSERT INTO qrcodes(shipment_id,payload,symbology,raw_text) VALUES(?,?,?,?)""", (s["id"], payload, symbology, raw_text)) return {"ok": True} @app.get("/api/shipments") def list_shipments(q: str = "", ship_no: str = "", start: str = "", end: str = ""): sql = "SELECT * FROM shipments WHERE 1=1"; args=[] if ship_no: sql+=" AND ship_no=?"; args.append(ship_no) if q: sql+=" AND (customer LIKE ? OR operator LIKE ?)"; args += [f"%{q}%", f"%{q}%"] if start: sql+=" AND date(created_at)>=date(?)"; args.append(start) if end: sql+=" AND date(created_at)<=date(?)"; args.append(end) with db() as conn: rows = [dict(r) for r in conn.execute(sql, args).fetchall()] return rows @app.get("/api/shipments/{ship_no}/export") def export_zip(ship_no: str): with db() as conn: s = conn.execute("SELECT id FROM shipments WHERE ship_no=?", (ship_no,)).fetchone() if not s: raise HTTPException(404, "shipment not found") imgs = conn.execute("SELECT path FROM images WHERE shipment_id=?", (s["id"],)).fetchall() ocr = conn.execute("SELECT json_data FROM ocr_tables WHERE shipment_id=? ORDER BY id DESC LIMIT 1", (s["id"],)).fetchone() qrs = conn.execute("SELECT payload FROM qrcodes WHERE shipment_id=?", (s["id"],)).fetchall() zbuf = io.BytesIO() with zipfile.ZipFile(zbuf, "w", zipfile.ZIP_DEFLATED) as z: for r in imgs: z.write(r["path"], Path(r["path"]).name) z.writestr("ocr.json", ocr["json_data"] if ocr else "{}") z.writestr("qrcodes.txt", "\n".join([r["payload"] for r in qrs])) zbuf.seek(0) return StreamingResponse(zbuf, media_type="application/zip", headers={"Content-Disposition": f'attachment; filename="{ship_no}.zip"'})

(端點對應規格第 16 節 API 草案與第 5–9 節功能)

五、拍照、OCR、掃碼最小服務

capture:

# apps/capture/capture.py import cv2, time, hashlib from pathlib import Path from datetime import datetime import requests, os, sys API = os.getenv("API","http://127.0.0.1:8000") DATA_ROOT = Path(os.getenv("DATA_ROOT","/data/shipments")) CAM_INDEX = int(os.getenv("CAM_INDEX","0")) def shot(ship_no: str, kind: str): cap = cv2.VideoCapture(CAM_INDEX, cv2.CAP_V4L2) ok, frame = cap.read(); cap.release() if not ok: raise SystemExit("camera read fail") ts = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") outdir = DATA_ROOT / datetime.utcnow().strftime("%Y/%m/%d") outdir.mkdir(parents=True, exist_ok=True) path = outdir / f"{ship_no}_{kind}_{ts}.jpg" cv2.imwrite(str(path), frame, [cv2.IMWRITE_JPEG_QUALITY, 92]) # 回報 API 建檔 with open(path, "rb") as f: requests.post(f"{API}/api/shipments/{ship_no}/images", data={"kind": kind}, files={"file": ("img.jpg", f, "image/jpeg")}) if __name__ == "__main__": ship_no, kind = sys.argv[1], sys.argv[2] # kind=image1|image2 shot(ship_no, kind)

ocr_worker:

# apps/ocr_worker/worker.py import pytesseract, json, cv2, sys, os, requests API = os.getenv("API","http://127.0.0.1:8000") def run(ship_no: str, img_path: str): img = cv2.imread(img_path) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) txt = pytesseract.image_to_string(gray, lang="chi_tra+eng") # 簡單解析:示意(實務用模板/規則) result = {"ship_no": ship_no, "raw": txt} requests.post(f"{API}/api/shipments/{ship_no}/ocr", data={"json_data": json.dumps(result, ensure_ascii=False), "engine": "tesseract", "conf": 0.90}) if __name__=="__main__": run(sys.argv[1], sys.argv[2])

qrcode_listener:

# apps/qrcode_listener/listener.py import sys, os, requests, time API = os.getenv("API","http://127.0.0.1:8000") CUR_SHIP = None def set_current(ship_no): # 可由 web-ui 設定或輸入 global CUR_SHIP; CUR_SHIP = ship_no requests.post(f"{API}/api/shipments", data={"ship_no": ship_no}) def on_scan(payload: str): if not CUR_SHIP: return requests.post(f"{API}/api/shipments/{CUR_SHIP}/qrcode", data={"payload": payload, "symbology":"QR"}) if __name__=="__main__": set_current(sys.argv[1]) # e.g. python listener.py SHP0001 for line in sys.stdin: # 示意:從 HID/Serial 驅動換成標準輸入 on_scan(line.strip())

(拍照、OCR、掃碼職責對應 FR-01/02/03)

六、Web 覆核/查詢最小頁(Jinja2)

# apps/web_ui/main.py from fastapi import FastAPI, Request from fastapi.templating import Jinja2Templates import sqlite3, os app = FastAPI() DB = os.getenv("DB_PATH","/data/shipcap.db") tmpl = Jinja2Templates("templates") @app.get("/") def index(r: Request, q: str=""): conn = sqlite3.connect(DB); conn.row_factory=sqlite3.Row rows = conn.execute("SELECT ship_no, customer, created_at FROM shipments ORDER BY id DESC LIMIT 100").fetchall() return tmpl.TemplateResponse("index.html", {"request": r, "rows": rows})

(頁面包含查詢、預覽、覆核欄位,對應 7.1)

七、設定、部署與維運

config.yaml(關鍵節點)

data_root: /data/shipments db_path: /data/shipcap.db camera: index: 0 resolution: [3264, 2448] ocr: engine: tesseract # or paddleocr security: jwt_secret: "change_me" backup: daily: "02:00"

(對應設定檔與備份需求)

docker-compose(同機部署)

version: "3.8" services: ingest: build: ./apps/ingest_api environment: - DATA_ROOT=/data/shipments - DB_PATH=/data/shipcap.db volumes: - ./data:/data ports: ["8000:8000"] command: uvicorn main:app --host 0.0.0.0 --port 8000 webui: build: ./apps/web_ui environment: - DB_PATH=/data/shipcap.db volumes: [ "./data:/data", "./apps/web_ui/templates:/app/templates" ] ports: ["8080:8080"] command: uvicorn main:app --host 0.0.0.0 --port 8080 ocr: build: ./apps/ocr_worker environment: [ "API=http://ingest:8000" ] volumes: [ "./data:/data" ] capture: build: ./apps/capture privileged: true devices: [ "/dev/video0:/dev/video0" ] environment: - API=http://ingest:8000 - DATA_ROOT=/data/shipments volumes: [ "./data:/data" ]

(容器化屬「可選」;亦可用 systemd)

systemd 範例

# deploy/systemd/ingest-api.service [Unit] Description=shipcap ingest api After=network.target [Service] WorkingDirectory=/opt/shipcap/apps/ingest_api ExecStart=/usr/bin/uvicorn main:app --host 0.0.0.0 --port 8000 Environment=DATA_ROOT=/data/shipments Environment=DB_PATH=/data/shipcap.db Restart=always [Install] WantedBy=multi-user.target

(符合規格之 systemd、logrotate、備份排程)

八、最小驗收腳本(單筆 E2E)

# 1) 建單 curl -F ship_no=SHP0001 -F customer=Acme -F ship_date=2025-09-09 -F operator=op1 http://127.0.0.1:8000/api/shipments # 2) 拍照1/2 python apps/capture/capture.py SHP0001 image1 python apps/capture/capture.py SHP0001 image2 # 3) OCR(示意用剛才的 image1 路徑) python apps/ocr_worker/worker.py SHP0001 /data/shipments/2025/09/09/SHP0001_image1_*.jpg # 4) 掃一個 QR payload curl -F payload="QR:ABC123" http://127.0.0.1:8000/api/shipments/SHP0001/qrcode # 5) 匯出 curl -L -o SHP0001.zip http://127.0.0.1:8000/api/shipments/SHP0001/export

(對應功能/匯出與驗收)

九、風險與強化點

  • 單據版式多樣:先以模板規則與欄位信心值門檻,低於門檻強制覆核;後續導入 PP-Structure。

  • 反光/陰影:硬體治具與固定光源;拍照前自動曝光/對焦檢查。

  • 多站:本機先存,sync_agent 排程上傳中央 DB/NAS。

需要我把上述骨架打包成 zip 或加上 PaddleOCR 版 worker 與 HID/Serial 讀碼驅動,直接可