日韩一区,国产二区,欧美三区,日本大片在线看黄a∨免费,欧美人体做爰大胆视频,欧洲美女黑人粗性暴交视频,日,韩,欧美一区二区三区

首頁>文檔>技術(shù)文檔>PHP使用kafka的操作是什么?

此組別內(nèi)的文章

需要支持?

如果通過文檔沒辦法解決您的問題,請?zhí)峤还潍@取我們的支持!

PHP使用kafka的操作是什么?

 Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 這篇文章主要給大家介紹PHP中使用kafka的操作,文中示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)和理解kafka的使用有一定的幫助,感興趣的朋友接下來一起跟隨小編看看吧。

    本文并沒有kafka的安裝教程,本文是針對已經(jīng)安裝kafka及其配置好kafka的php拓展并且使用laravel框架進(jìn)行開發(fā)項(xiàng)目,配置一個(gè)可供laravel框架使用的生產(chǎn)及消費(fèi)者類.

    以下代碼修改自本站的YII框架關(guān)于kafka類的代碼,經(jīng)過測試使用在本人的項(xiàng)目中,可正常運(yùn)行,larvael版本:5.6 代碼放置larvael框架位置:app/Tools/Kafka.php

<?php
namespace App\Tools;
 
use Illuminate\Config\Repository;
 
use Illuminate\Support\Facades\DB;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
 
use Illuminate\Http\Request;
 
class Kafka
{
  public $broker_list = '127.0.0.1';//配置kafka,可以用逗號(hào)隔開多個(gè)kafka
  public $topic = 'test';//管道名稱
  public $partition = 0;
 
  protected $producer = null;
  protected $consumer = null;
 
  public function __construct()
  {
    if (empty($this->broker_list)) {
      throw new InvalidConfigException("broker not config");
    }
    $rk = new \RdKafka\Producer();
    if (empty($rk)) {
      throw new InvalidConfigException("producer error");
    }
    $rk->setLogLevel(LOG_DEBUG);
    if (!$rk->addBrokers($this->broker_list)) {
      throw new InvalidConfigException("producer error");
    }
    $this->producer = $rk;
  }
 
  /**
   * 生產(chǎn)者
   * @param array $messages
   * @return mixed
   */
  public function send($messages = [],$topic)
  {
    $topic = $this->producer->newTopic($topic);
    return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
  }
 
  /**
   * 消費(fèi)者
   */
  public function consumer($object, $callback){
    $conf = new \RdKafka\Conf();
    $conf->set('group.id', 0);
    $conf->set('metadata.broker.list', $this->broker_list);
 
    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');
 
    $conf->setDefaultTopicConf($topicConf);
 
    $consumer = new \RdKafka\KafkaConsumer($conf);
 
    $consumer->subscribe([$this->topic]);
 
    echo "waiting for messages.....\n";
    while(true) {
      $message = $consumer->consume(120*1000);
      switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
          echo "message payload....";
          $object->$callback($message->payload);
          break;
      }
      sleep(1);
    }
  }
}
?>

    在控制器中如何使用:

    首先再頭部導(dǎo)入這個(gè)類:use App\Tools\Kafka;

    下面是使用生產(chǎn)者實(shí)例:

public function test(){
 
   $topic = 'tool';//輸入使用管道名稱
   $data['shop_id'] = 58;
   $data['bar_code']=586;
   $data['goods_num'] = 1;
   $data['goods_unit'] = '個(gè)';
 
$Kafka = new Kafka();
$Error_Msg = $Kafka->send($data,$topic);//傳入數(shù)組會(huì)自動(dòng)轉(zhuǎn)換json
var_dump($Error_Msg);
 
 
  }

    下面是消費(fèi)者實(shí)例,消費(fèi)者我這里使用了的是php腳本進(jìn)行的操作:

<?php
 
$conf = new RdKafka\Conf();
 
$conf->set('group.id', 'myConsumerGroup');
 
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");
 
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
 
$topic = $rk->newTopic("tool", $topicConf);//讀取的管道
 
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
 
while (true) {
  $message = $topic->consume(0, 120*10000);
  switch ($message->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    //沒有錯(cuò)誤打印信息
      $message = json_decode(json_encode($message),true);
      $data = json_decode($message['payload'],true);
      var_dump($data);
      break;
    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
      echo "等待接收信息\n";
      break;
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
      echo "超時(shí)\n";
      break;
    default:
      throw new \Exception($message->errstr(), $message->err);
      break;
  }
 sleep(1);
}
 
?>
0 條回復(fù) A文章作者 M管理員
    暫無討論,說說你的看法吧
QQ客服
  • QQ176363189 點(diǎn)擊這里給我發(fā)消息
旺旺客服
  • 速度網(wǎng)絡(luò)服務(wù)商 點(diǎn)這里給我發(fā)消息
電子郵箱
  • sudu@yunjiasu.cc
微信客服
  • suduwangluo