首页
文章
留言
首页
文章
留言
PHP使用Kafka
2017 年 02 月 12 日
后端
PHP
Kafka
Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。 #### 安装 librdkafka 库 ```plaintext git clone https://github.com/edenhill/librdkafka.git ./configure make && make install ``` #### 安装 php-rdkafka 扩展 ```plaintext php -m | grep kafka rdkafka ``` #### 生产者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); // 从终端接收输入 $oInputHandler = fopen('php://stdin', 'r'); while (true) { echo "\nEnter messages:\n"; $sMsg = trim(fgets($oInputHandler)); // 空消息意味着退出 if (empty($sMsg)) { break; } // 发送消息 $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg); } echo "done\n"; ``` #### 消费者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); /** * consumeStart * 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息 * 第二个参数标识从什么位置开始拉取消息,可选值为 * RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息 * RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息 * RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样 */ $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END); while (true) { // 第一个参数是分区,第二个参数是超时时间 $oMsg = $oObjTopic->consume(0, 1000); // 没拉取到消息时,返回NULL if (!$oMsg) { usleep(10000); continue; } if ($oMsg->err) { echo $oMsg->errstr(), "\n"; break; } else { echo $oMsg->payload, "\n"; } } ```
0
相关文章
PHP开发之字符串处理
PHP开发之PDO使用总结
Sphinx使用总结
Redis、MemCache、MongoDB比较
Kafka使用总结
全部分类
前端
后端
运维
架构
算法
数据库
移动应用
桌面应用
程序开发
热门标签
Supervisor
Redis
Nginx
Sphinx
Composer
Android
NoSQL
Objective-C
爬虫
HTML
GUI
Qt
Linux
Docker
iOS
CSS
C++
Git
Kubernetes
Kafka
MongoDB
JavaScript
Shell
MySQL
OpenResty
Lua
Elasticsearch
Python
CentOS
macOS
PHP
多线程
热门文章
Supervisor使用总结
10种常见的软件架构模式
Redis基本使用总结
CentOS常用命令总结
iOS开发之WiFi传输文件
iOS开发之定时执行任务
Nginx常用配置说明
Composer使用总结
C/C++基础知识总结
CSS设置图片水平及垂直居中