首页
文章
留言
首页
文章
留言
PHP使用Kafka
2017 年 02 月 12 日
后端
PHP
Kafka
Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。 #### 安装 librdkafka 库 ```plaintext git clone https://github.com/edenhill/librdkafka.git ./configure make && make install ``` #### 安装 php-rdkafka 扩展 ```plaintext php -m | grep kafka rdkafka ``` #### 生产者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); // 从终端接收输入 $oInputHandler = fopen('php://stdin', 'r'); while (true) { echo "\nEnter messages:\n"; $sMsg = trim(fgets($oInputHandler)); // 空消息意味着退出 if (empty($sMsg)) { break; } // 发送消息 $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg); } echo "done\n"; ``` #### 消费者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); /** * consumeStart * 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息 * 第二个参数标识从什么位置开始拉取消息,可选值为 * RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息 * RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息 * RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样 */ $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END); while (true) { // 第一个参数是分区,第二个参数是超时时间 $oMsg = $oObjTopic->consume(0, 1000); // 没拉取到消息时,返回NULL if (!$oMsg) { usleep(10000); continue; } if ($oMsg->err) { echo $oMsg->errstr(), "\n"; break; } else { echo $oMsg->payload, "\n"; } } ```
0
相关文章
PHP开发之PDO使用总结
Sphinx使用总结
Redis、MemCache、MongoDB比较
Redis基本使用总结
Kafka使用总结
全部分类
前端
后端
运维
架构
算法
数据库
移动应用
桌面应用
程序开发
热门标签
Git
爬虫
CentOS
Lua
Elasticsearch
iOS
Composer
PHP
Android
GUI
Kubernetes
Kafka
HTML
Qt
Sphinx
MongoDB
CSS
C++
MySQL
JavaScript
Linux
NoSQL
多线程
Python
OpenResty
Redis
Nginx
Supervisor
Docker
Objective-C
macOS
Shell
热门文章
Composer使用总结
PHP开发之字符串处理
Qt实现TCP通讯
Git使用总结
Linux命令总结
JavaScript常用函数总结
iOS开发之面向对象
iOS开发之WiFi传输文件
10种常见的软件架构模式
Elasticsearch详解