您好,登錄后才能下訂單哦!
在單機環境下實現字符串追加函數(Pulsar 2.4.2版本)
1 啟動單機Pulsar
? ? ?$ bin/pulsar-daemon start standalone
2 創建函數
1) 準備環境
? ? 項目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'
2) 創建JAVA函數(此函數用于數據源來的topic schema是string,輸出的tiopic schema是string)
? ? ?
? ? ?導出jar包,放到pulsar服務器目錄下,本例子放在 /data/jar/下
3)使用命令行工具加載函數到Pulsar,? ? ? ? ? ? ? ? ? ? ?
? ?bin/pulsar-admin functions create \
? ?--classname test.AppStrFunction \
? ?--jar /data/jar/pf.jar \
? ?--inputs persistent://public/default/tlstest \
? ?--output persistent://public/default/teststr \
? ?--tenant public \
? ?--namespace default \
? ?--name appStrFunction
? ?參數說明:
? ? ? ? ? ? ? ? ? ? ?
參數 | 說明 |
functions | 通知 pulsar broker,函數操作 |
create | 創建函數,默認創建成功后啟動 |
classname | 函數類名稱,需要加上包名 |
jar | 指定 jar 包的運行路徑 |
inputs | 指定 函數 數據的來源在哪里,支持多個 topics 作為輸入 |
output | 如果該 函數 有輸出(有些情況下,function 沒有輸出),指定 function 輸出的 topic,只能有一個輸出 |
tenant | 指定該 函數 運行的租戶名 |
namespace | 指定該 函數 運行的命名空間 |
name | 指定該 函數 運行的名稱 |
停止函數
bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name appStrFunction
啟動函數
bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name appStrFunction
刪除函數
bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name appStrFunction
函數的日志在 pulsar安裝目錄 /logs/functions下
3 測試函數
? ?根據前邊函數已成功加載啟動
1)向tlstest主題發送消息? ?
import?java.util.concurrent.TimeUnit; import?org.apache.pulsar.client.api.Producer; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; public?class?SendMsgTest{ ??public?static?void?main(String[]?args){ ??????String?url="pulsar://192.168.1.48:6650"; ??try{ ?????PulsarClient?client?=PulsarClient.builder() ???????????.serviceUrl(url) ???????????.connectionTimeout(10,TimeUnit.SECONDS) ???????????.build(); ?????Producer<String>?producer=client.newProducer(Schema.STRING) ???????????.topic("tlstest") ???????????.sendTimeout(10,TimeUnit.SECONDS) ???????????.producerName("senduser") ???????????.create(); ???????????producer.send("this?is?a?book"); ???????????System.out.print("send?ok"); ???????????client.close(); ??????}catch(Exception?e){ ????????e.printStackTrace(); ??????} ??} }
2)讀取teststr主題消息
? ?
import?org.apache.pulsar.client.api.Consumer; import?org.apache.pulsar.client.api.Message; import?org.apache.pulsar.client.api.PulsarClient; import?org.apache.pulsar.client.api.Schema; import?org.apache.pulsar.client.api.SubscriptionInitialPosition; import?org.apache.pulsar.client.api.SubscriptionType; import?org.apache.pulsar.client.impl.schema.JSONSchema; import?schema.OrderModel; import?com.alibaba.fastjson.JSON; public?class?RecFunTest?{ public?static?void?main(String[]?args)?{ String?url?=?"http://192.168.1.48:8080"; try{ ??PulsarClient?client?=PulsarClient.builder() ????.serviceUrl(url) ????.build(); ?Consumer<String>?consumer=client.newConsumer(Schema.STRING) ????.topic("teststr") ????.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) ????.subscriptionType(SubscriptionType.Exclusive)//訂閱模式??Exclusive(獨占,默認模式)?Failover(災備)Shared(共享) ????.subscriptionName("wbq")//訂閱者名稱 ????.subscribe(); ?while?(true)?{ ???Message<String>?mondmsg?=?consumer.receive(); ???String?msg=mondmsg.getValue(); ????????????????System.out.println("receive?message=:"+msg); ?????????????} ??}catch(Exception?e){ ?????e.printStackTrace(); ??} ?} }
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。