首页
文章
留言
首页
文章
留言
PHP使用Kafka
2017 年 02 月 12 日
后端
PHP
Kafka
Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。 #### 安装 librdkafka 库 ```plaintext git clone https://github.com/edenhill/librdkafka.git ./configure make && make install ``` #### 安装 php-rdkafka 扩展 ```plaintext php -m | grep kafka rdkafka ``` #### 生产者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); // 从终端接收输入 $oInputHandler = fopen('php://stdin', 'r'); while (true) { echo "\nEnter messages:\n"; $sMsg = trim(fgets($oInputHandler)); // 空消息意味着退出 if (empty($sMsg)) { break; } // 发送消息 $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg); } echo "done\n"; ``` #### 消费者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); /** * consumeStart * 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息 * 第二个参数标识从什么位置开始拉取消息,可选值为 * RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息 * RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息 * RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样 */ $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END); while (true) { // 第一个参数是分区,第二个参数是超时时间 $oMsg = $oObjTopic->consume(0, 1000); // 没拉取到消息时,返回NULL if (!$oMsg) { usleep(10000); continue; } if ($oMsg->err) { echo $oMsg->errstr(), "\n"; break; } else { echo $oMsg->payload, "\n"; } } ```
0
相关文章
PHP常用函数总结
Redis、MemCache、MongoDB比较
Nginx常用配置说明
PHP开发之字符串处理
Redis基本使用总结
全部分类
前端
后端
运维
架构
算法
数据库
移动应用
桌面应用
程序开发
热门标签
PHP
Lua
GUI
OpenResty
Android
JavaScript
Linux
爬虫
HTML
Kubernetes
Shell
Composer
多线程
Python
CentOS
NoSQL
CSS
MySQL
Redis
iOS
Qt
Elasticsearch
Objective-C
Git
Docker
MongoDB
Sphinx
macOS
Kafka
C++
Supervisor
Nginx
热门文章
iOS开发之类与类扩展
PHP开发之PDO使用总结
10种常见的软件架构模式
Sphinx使用总结
HTML5常用特性总结
Kubernetes介绍
macOS常用命令
OpenResty+Lua+Kafka收集日志
Elasticsearch详解
Qt实现TCP通讯