首页
文章
留言
首页
文章
留言
PHP使用Kafka
2017 年 02 月 12 日
后端
PHP
Kafka
Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。 #### 安装 librdkafka 库 ```plaintext git clone https://github.com/edenhill/librdkafka.git ./configure make && make install ``` #### 安装 php-rdkafka 扩展 ```plaintext php -m | grep kafka rdkafka ``` #### 生产者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); // 从终端接收输入 $oInputHandler = fopen('php://stdin', 'r'); while (true) { echo "\nEnter messages:\n"; $sMsg = trim(fgets($oInputHandler)); // 空消息意味着退出 if (empty($sMsg)) { break; } // 发送消息 $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg); } echo "done\n"; ``` #### 消费者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); /** * consumeStart * 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息 * 第二个参数标识从什么位置开始拉取消息,可选值为 * RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息 * RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息 * RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样 */ $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END); while (true) { // 第一个参数是分区,第二个参数是超时时间 $oMsg = $oObjTopic->consume(0, 1000); // 没拉取到消息时,返回NULL if (!$oMsg) { usleep(10000); continue; } if ($oMsg->err) { echo $oMsg->errstr(), "\n"; break; } else { echo $oMsg->payload, "\n"; } } ```
0
相关文章
Redis基本使用总结
Python爬虫之Beautiful Soup的使用
PHP开发之PDO使用总结
Redis、MemCache、MongoDB比较
Kafka使用总结
全部分类
前端
后端
运维
架构
算法
数据库
移动应用
桌面应用
程序开发
热门标签
Kubernetes
GUI
Supervisor
HTML
MongoDB
C++
Linux
CentOS
Nginx
爬虫
Python
OpenResty
Objective-C
Kafka
JavaScript
Git
macOS
Sphinx
Android
Elasticsearch
iOS
MySQL
Composer
NoSQL
Lua
多线程
CSS
Qt
Docker
PHP
Shell
Redis
热门文章
PHP开发之PDO使用总结
Kafka使用总结
HTML5常用特性总结
Kubernetes介绍
jquery.tmpl使用总结
10种常见的软件架构模式
Python爬虫之Beautiful Soup的使用
JavaScript常用函数总结
PHP开发之字符串处理
macOS常用命令