PHPackages                             phoneda/tp5-mqtt-collector - PHPackages - PHPackages  [Skip to content](#main-content)[PHPackages](/)[Directory](/)[Categories](/categories)[Trending](/trending)[Leaderboard](/leaderboard)[Changelog](/changelog)[Analyze](/analyze)[Collections](/collections)[Log in](/login)[Sign up](/register)

1. [Directory](/)
2. /
3. [Utility &amp; Helpers](/categories/utility)
4. /
5. phoneda/tp5-mqtt-collector

ActiveLibrary[Utility &amp; Helpers](/categories/utility)

phoneda/tp5-mqtt-collector
==========================

MQTT 数据采集扩展包 for ThinkPHP 5.0

1.0.0(7mo ago)01MITPHPPHP &gt;=5.6

Since Sep 27Pushed 7mo agoCompare

[ Source](https://github.com/hndaheng-cpu/tp5-mqtt-controller)[ Packagist](https://packagist.org/packages/phoneda/tp5-mqtt-collector)[ RSS](/packages/phoneda-tp5-mqtt-collector/feed)WikiDiscussions main Synced 1mo ago

READMEChangelogDependencies (2)Versions (2)Used By (0)

ThinkPHP 5.0 MQTT 数据采集扩展包
=========================

[](#thinkphp-50-mqtt-数据采集扩展包)

**tp5-mqtt-collector** 是一个专为 **ThinkPHP 5.0** 设计的 MQTT 数据采集扩展包，适用于物联网(IoT)数据采集场景。
支持自动解析 JSON 消息、动态映射数据库表、自动建表、字段类型识别和索引建议。

---

一、功能亮点
------

[](#一功能亮点)

✅ **MQTT 协议支持**：基于 Workerman/MQTT 实现高性能客户端
✅ **断线自动重连**：网络异常或 Broker 断开时自动尝试重连
✅ **Redis 消息队列**：缓存消息，防止数据丢失
✅ **批量写入 MySQL**：提高数据库写入性能
✅ **JSON 自动解析**：自动识别 JSON 格式消息
✅ **字段类型自动识别**：根据值类型自动映射 MySQL 字段类型
✅ **自动建表/添加字段**：首次收到消息时自动创建数据表和缺失字段
✅ **自动索引建议**：根据数据特征自动建议并创建索引
✅ **ThinkPHP 命令行启动**：符合 ThinkPHP 5.0 开发习惯

---

二、系统架构
------

[](#二系统架构)

 ```
graph TD
    A[MQTT Broker] -->|发布消息| B[MQTT 客户端(Workerman/MQTT)]
    B -->|断线重连| A
    B -->|接收消息| C[消息解析器(Parser.php)]
    C -->|结构化数据| D[Redis 消息队列(RedisQueue.php)]
    D -->|批量触发/定时触发| E[数据库处理器(DbHandler.php)]
    E -->|自动建表/加字段| F[MySQL 数据库]
    E -->|自动索引建议| F
    G[ThinkPHP 命令行] -->|启动/停止| B
    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B fill:#9f9,stroke:#333,stroke-width:2px
    style D fill:#99f,stroke:#333,stroke-width:2px
    style F fill:#ff9,stroke:#333,stroke-width:2px
```

      Loading ---

三、消息处理流程
--------

[](#三消息处理流程)

 ```
sequenceDiagram
    participant Broker as MQTT Broker
    participant Client as MQTT 客户端
    participant Parser as 消息解析器
    participant Redis as Redis 队列
    participant Db as 数据库处理器
    participant MySQL as MySQL 数据库

    Client->>Broker: 建立连接并订阅主题
    Broker->>Client: 推送消息 (PUBLISH)
    Client->>Parser: 传递消息 (topic + payload)
    Parser->>Parser: 解析 JSON + 匹配主题规则
    Parser->>Redis: 存入队列 (带 table 标识)
    Redis->>Db: 批量出队 (buffer_size 或超时)
    Db->>MySQL: 检查表结构 (自动建表/加字段)
    Db->>MySQL: 批量插入数据
    Db->>MySQL: 分析数据特征创建索引
```

      Loading ---

四、安装方法
------

[](#四安装方法)

### 1. 安装扩展包

[](#1-安装扩展包)

```
composer require yourname/tp5-mqtt-collector
```

### 2. 复制配置文件

[](#2-复制配置文件)

```
cp vendor/yourname/tp5-mqtt-collector/config/mqtt.php application/config/mqtt.php
```

### 3. 注册命令

[](#3-注册命令)

在 `application/command.php` 文件中添加：

```
return [
    'think\mqtt\command\MqttConsumer',
];
```

### 4. 启动服务

[](#4-启动服务)

```
php think mqtt:consumer
```

### 5. 后台运行

[](#5-后台运行)

```
php think mqtt:consumer start -d
```

---

五、配置说明
------

[](#五配置说明)

配置文件位置：`application/config/mqtt.php`

```
return [
    'mqtt' => [
        'host'      => '127.0.0.1', // MQTT Broker 地址
        'port'      => 1883,        // MQTT 端口
        'username'  => '',          // 用户名
        'password'  => '',          // 密码
        'client_id' => 'tp_mqtt_' . uniqid(),
        'topics'    => ['sensor/#', 'device/#'], // 订阅的主题
    ],
    'redis' => [
        'host'     => '127.0.0.1',
        'port'     => 6379,
        'password' => '',
        'db'       => 0,
    ],
    'buffer_size'    => 100, // 批量写入数量
    'buffer_timeout' => 5,   // 批量写入超时时间(秒)
    'index_suggestion' => [
        'enabled'         => true,   // 是否启用索引建议
        'min_selectivity' => 0.1,    // 最低选择性要求
        'check_frequency' => 1000,   // 检查频率
    ],
    'topic_map' => [
        'sensor/+/temp' => [
            'table'    => 'sensor_temp',
            'fields'   => ['device_id', 'temperature', 'humidity'],
            'required' => ['device_id', 'temperature']
        ],
        'device/+/status' => [
            'table'    => 'device_status',
            'fields'   => ['device_id', 'status', 'battery'],
            'required' => ['device_id', 'status']
        ]
    ],
    'default_table' => 'mqtt_messages', // 默认表
];
```

---

六、使用示例
------

[](#六使用示例)

### 1. 发布 MQTT 消息

[](#1-发布-mqtt-消息)

使用 `mosquitto_pub` 工具发布测试数据：

```
# 发布传感器温度数据
mosquitto_pub -t "sensor/dev001/temp" -m '{"device_id":"dev001","temperature":25.6,"humidity":60}'

# 发布设备状态数据
mosquitto_pub -t "device/dev002/status" -m '{"device_id":"dev002","status":1,"battery":3.7}'
```

### 2. 数据库验证

[](#2-数据库验证)

扩展包会自动创建对应的数据表并写入数据：

```
-- 查看传感器数据表
SELECT * FROM sensor_temp;

-- 查看设备状态表
SELECT * FROM device_status;
```

---

七、注意事项
------

[](#七注意事项)

- **数据库编码**：建议使用 `utf8mb4` 编码，支持中文和 Emoji
- **权限问题**：确保数据库用户有建表、修改表结构的权限
- **Redis**：确保 Redis 服务正常运行且配置正确
- **性能优化**：可根据服务器性能调整 `buffer_size` 和 `buffer_timeout` 参数

---

八、常见问题
------

[](#八常见问题)

1. **MQTT 连接失败**：检查 Broker 地址、端口和认证信息
2. **Redis 连接失败**：检查 Redis 服务是否运行
3. **数据表创建失败**：检查数据库用户权限
4. **中文乱码**：确保数据库和表使用 utf8mb4 编码
5. **性能问题**：调整批量写入参数

---

九、许可证
-----

[](#九许可证)

MIT License

---

十、致谢
----

[](#十致谢)

- [Workerman/MQTT](https://github.com/walkor/mqtt) - PHP MQTT 客户端库
- [ThinkPHP](https://thinkphp.cn) - 优秀的 PHP 开发框架
- [Predis](https://github.com/nrk/predis) - PHP Redis 客户端

---

###  Health Score

26

—

LowBetter than 43% of packages

Maintenance63

Regular maintenance activity

Popularity1

Limited adoption so far

Community8

Small or concentrated contributor base

Maturity30

Early-stage or recently created project

 Bus Factor1

Top contributor holds 80% of commits — single point of failure

How is this calculated?**Maintenance (25%)** — Last commit recency, latest release date, and issue-to-star ratio. Uses a 2-year decay window.

**Popularity (30%)** — Total and monthly downloads, GitHub stars, and forks. Logarithmic scaling prevents top-heavy scores.

**Community (15%)** — Contributors, dependents, forks, watchers, and maintainers. Measures real ecosystem engagement.

**Maturity (30%)** — Project age, version count, PHP version support, and release stability.

###  Release Activity

Cadence

Unknown

Total

1

Last Release

227d ago

### Community

Maintainers

![](https://www.gravatar.com/avatar/c04d2d23afbcf57eeb39614804894424685444d1588e07dc98abc5927b98b7fc?d=identicon)[hndaheng-cpu](/maintainers/hndaheng-cpu)

---

Top Contributors

[![hndaheng](https://avatars.githubusercontent.com/u/33999211?v=4)](https://github.com/hndaheng "hndaheng (4 commits)")[![hndaheng-cpu](https://avatars.githubusercontent.com/u/234710649?v=4)](https://github.com/hndaheng-cpu "hndaheng-cpu (1 commits)")

### Embed Badge

![Health badge](/badges/phoneda-tp5-mqtt-collector/health.svg)

```
[![Health](https://phpackages.com/badges/phoneda-tp5-mqtt-collector/health.svg)](https://phpackages.com/packages/phoneda-tp5-mqtt-collector)
```

###  Alternatives

[grumpydictator/firefly-iii

Firefly III: a personal finances manager.

22.8k69.3k](/packages/grumpydictator-firefly-iii)[blair2004/nexopos

The Free Modern Point Of Sale System build with Laravel, TailwindCSS and Vue.js.

1.2k2.3k](/packages/blair2004-nexopos)[php-junior/laravel-video-chat

Laravel Video Chat using Socket.IO and WebRTC

82018.1k](/packages/php-junior-laravel-video-chat)[vcian/pulse-active-sessions

A Laravel Pulse card to show active user session.

11469.2k](/packages/vcian-pulse-active-sessions)[splitsoftware/split-sdk-php

Split SDK for PHP

161.3M2](/packages/splitsoftware-split-sdk-php)[vwo/vwo-php-sdk

VWO server side sdk

12166.4k](/packages/vwo-vwo-php-sdk)

PHPackages © 2026

[Directory](/)[Categories](/categories)[Trending](/trending)[Changelog](/changelog)[Analyze](/analyze)
