91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Pulsar Function 例子

發布時間:2020-02-28 21:05:11 來源:網絡 閱讀:327 作者:qq5dc264c690eab 欄目:大數據

在單機環境下實現字符串追加函數(Pulsar 2.4.2版本)

1 啟動單機Pulsar

? ? ?$ bin/pulsar-daemon start standalone

2 創建函數

1) 準備環境

? ? 項目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'

2) 創建JAVA函數(此函數用于數據源來的topic schema是string,輸出的tiopic schema是string)

? ? ?Pulsar Function 例子

? ? ?導出jar包,放到pulsar服務器目錄下,本例子放在 /data/jar/下

3)使用命令行工具加載函數到Pulsar,? ? ? ? ? ? ? ? ? ? ?

? ?bin/pulsar-admin functions create \

? ?--classname test.AppStrFunction \

? ?--jar /data/jar/pf.jar \

? ?--inputs persistent://public/default/tlstest \

? ?--output persistent://public/default/teststr \

? ?--tenant public \

? ?--namespace default \

? ?--name appStrFunction

? ?參數說明:

? ? ? ? ? ? ? ? ? ? ?

參數
說明
functions通知 pulsar broker,函數操作
create創建函數,默認創建成功后啟動
classname函數類名稱,需要加上包名
jar指定 jar 包的運行路徑
inputs指定 函數 數據的來源在哪里,支持多個 topics 作為輸入
output如果該 函數 有輸出(有些情況下,function 沒有輸出),指定 function 輸出的 topic,只能有一個輸出
tenant指定該 函數 運行的租戶名
namespace指定該 函數 運行的命名空間
name指定該 函數 運行的名稱
以下是函數相關其他操作

停止函數

bin/pulsar-admin functions stop \

--tenant public \

--namespace default \

--name appStrFunction

啟動函數

bin/pulsar-admin functions start \

--tenant public \

--namespace default \

--name appStrFunction

刪除函數

bin/pulsar-admin functions delete \

--tenant public \

--namespace default \

--name appStrFunction

函數的日志在 pulsar安裝目錄 /logs/functions下

3 測試函數

? ?根據前邊函數已成功加載啟動

1)向tlstest主題發送消息? ?

import?java.util.concurrent.TimeUnit;
import?org.apache.pulsar.client.api.Producer;
import?org.apache.pulsar.client.api.PulsarClient;
import?org.apache.pulsar.client.api.Schema;
public?class?SendMsgTest{
??public?static?void?main(String[]?args){
??????String?url="pulsar://192.168.1.48:6650";
??try{
?????PulsarClient?client?=PulsarClient.builder()
???????????.serviceUrl(url)
???????????.connectionTimeout(10,TimeUnit.SECONDS)
???????????.build();
?????Producer<String>?producer=client.newProducer(Schema.STRING)
???????????.topic("tlstest")
???????????.sendTimeout(10,TimeUnit.SECONDS)
???????????.producerName("senduser")
???????????.create();
???????????producer.send("this?is?a?book");
???????????System.out.print("send?ok");
???????????client.close();
??????}catch(Exception?e){
????????e.printStackTrace();
??????}
??}
}

2)讀取teststr主題消息

? ?

import?org.apache.pulsar.client.api.Consumer;
import?org.apache.pulsar.client.api.Message;
import?org.apache.pulsar.client.api.PulsarClient;
import?org.apache.pulsar.client.api.Schema;
import?org.apache.pulsar.client.api.SubscriptionInitialPosition;
import?org.apache.pulsar.client.api.SubscriptionType;
import?org.apache.pulsar.client.impl.schema.JSONSchema;
import?schema.OrderModel;
import?com.alibaba.fastjson.JSON;
public?class?RecFunTest?{
public?static?void?main(String[]?args)?{
String?url?=?"http://192.168.1.48:8080";
try{
??PulsarClient?client?=PulsarClient.builder()
????.serviceUrl(url)
????.build();
?Consumer<String>?consumer=client.newConsumer(Schema.STRING)
????.topic("teststr")
????.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
????.subscriptionType(SubscriptionType.Exclusive)//訂閱模式??Exclusive(獨占,默認模式)?Failover(災備)Shared(共享)
????.subscriptionName("wbq")//訂閱者名稱
????.subscribe();
?while?(true)?{
???Message<String>?mondmsg?=?consumer.receive();
???String?msg=mondmsg.getValue();
????????????????System.out.println("receive?message=:"+msg);
?????????????}
??}catch(Exception?e){
?????e.printStackTrace();
??}
?}
}


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阳谷县| 浮山县| 德昌县| 双城市| 邢台县| 寻甸| 阿拉善右旗| 光泽县| 泊头市| 巢湖市| 镇平县| 清苑县| 阿拉善左旗| 新河县| 永昌县| 财经| 克拉玛依市| 尖扎县| 柘城县| 津市市| 南涧| 清原| 广德县| 南靖县| 泰顺县| 栾川县| 察隅县| 岳阳县| 泰州市| 玉田县| 丰宁| 景宁| 成武县| 鸡东县| 汤原县| 乌兰察布市| 枝江市| 仙桃市| 哈巴河县| 凯里市| 凉城县|