您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Pulsar IO,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
Apache Pulsar 是業界領先的消息系統。使用消息系統時,一個較為常見的問題就是:將數據移入或移出消息平臺的最佳方法是什么?
當然,用戶可以使用 Pulsar 的 consumer 和 producer API 編寫自定義代碼,來傳輸數據。但除此之外,是否還有其他方法呢?
以下為用戶提出的一些相關問題:
1. 要將數據發布到 Pulsar 或使用 Pulsar 中的數據,我應該在哪里運行相應程序?
2. 要將數據發布到 Pulsar 或使用 Pulsar 中的數據,我應該怎樣運行相應程序?
用戶之所以會提出這些問題,是因為其他消息/發布-訂閱系統沒有提供有組織且容錯的方式來幫助用戶從外部系統輸入數據或將數據輸出到外部系統,因而用戶需要尋求自定義解決方案并手動運行。
為了解決上述問題并簡化這一過程,我們推出了 Pulsar IO。
Pulsar IO 通過利用現有的 Pulsar Functions 框架來輸入/輸出數據。而 Pulsar Functions 框架的所有優勢(如:容錯性、并行性、彈性、負載平衡、按需更新等)都可以直接被 Pulsar 輸入/輸出數據的應用程序所利用。
而且,我們發現經常會出現這樣的情況,用戶花很大功夫(因為他們不是消息系統方面的專家,可能也不想成為這一領域的專家)去編寫自定義程序,用于從消息傳遞系統訪問數據。
自定義編寫這些應用程序不僅會很困難,而且我們發現,許多用戶在嘗試實現執行相同功能的應用程序時,做了相同的工作。歸根結底,消息系統只是用于移動數據的工具,因此,在設計 Pulsar IO 框架時,我們的主要目標之一就是易用性。
我們希望用戶能夠在不編寫任何代碼,也不用同時成為 Pulsar 和外部系統專家的情況下,可以從外部系統輸入數據或將數據輸出到外部系統。
首先,我們定義兩個應用程序,一個作為 source 將數據輸入到 Pulsar ,另一個作為 sink 從 Pulsar 接收數據。
Source 將數據從外部系統導入 Pulsar,而 sink 將數據從 Pulsar 導出到外部系統。具體來看,source 從外部系統讀取數據,并將數據寫入 Pulsar topic,而 sink 從一個或多個 Pulsar topic 讀取數據,并將數據寫入外部系統。
Pulsar IO 框架在現有的 Pulsar functions 框架上運行。單個 source 和 sink 可以像 function 一樣與 Pulsar broker 一起運行,如下圖所示。
因此,Pulsar Functions 框架的所有優勢都適用于 Pulsar IO 框架,即 sink 和 source 應用程序。
正如前面提到的,我們的設計目標包括用戶無需編寫任何自定義應用程序,也無需編寫任何代碼就可以將數據移入或移出 Pulsar。
因此,Pulsar IO 框架中有多種內置 source 和 sink(Kafka、Twitter Firehose、Cassandra、Aerospike 等,還會支持更多),用戶只需使用一個命令便可運行。用戶因此可以關注于業務邏輯,而無需擔心實現細節。
使用 Pulsar IO 框架很容易。用戶可以在命令行界面使用一行簡單的命令啟動內置 source 或 sink。例如,用戶可以用下面的命令來提交 source 到已有的 Pulsar 集群,命令格式如下:
$ ./bin/pulsar-admin source create \ --tenant <tenant> \ --namespace <namespace> \ --name <source-name> \ --destinationTopicName <input-topics> \ --source-type <source-type>
以下示例為運行 twitter firehose source 的命令,用于將 Twitter 中的數據導入 Pulsar:
$ ./bin/pulsar-admin source create \--tenant test \ --namespace ns1 \ --name twitter-source \ --destinationTopicName twitter_data \ --sourceConfigFile examples/twitter.yml \ --source-type twitter
經過以上步驟,用戶即可向 Pulsar 輸入數據,而無需編寫或編譯任何代碼。唯一可能需要的是一個配置文件,用于為該 source 或 sink 指定某些配置。用戶可以通過以下格式的命令向現有的 Pulsar 集群中提交待運行的內置 sink:
$ ./bin/pulsar-admin sink create \ --tenant <tenant> \ --namespace <namespace> \ --name <sink-name> \ --inputs <input-topics> \ --sink-type <sink-type>
以下為運行 Cassandra sink 的示例命令,用于將數據從 Pulsar 導出到 Cassandra:
$ ./bin/pulsar-admin sink create \ --tenant public \ --namespace default \ --name cassandra-test-sink \ --sink-type cassandra \ --sinkConfigFile examples/cassandra-sink.yml \ --inputs test_cassandra
更多關于如何運行 Cassandra source 的信息,參閱快速入門指南:
https://pulsar.apache.org/docs/en/2.1.1-incubating/io-quickstart/
以上命令顯示了如何在“集群”模式下(即作為現有 Pulsar 集群的一部分)運行 source 和 sink。除此之外,還可以在“本地運行”模式下將 source 和 sink 作為獨立進程運行,這一模式會在機器上生成本地進程并且運行 source 或者 sink 的邏輯。
本地運行模式有助于測試和調試,但是,需要用戶自行監控和監督。以下為在本地運行模式下運行 source 的命令示例:
$ ./bin/pulsar-admin sink localrun \ --tenant public \ --namespace default \ --name cassandra-test-sink \ --sink-type cassandra \ --sinkConfigFile examples/cassandra-sink.yml \ --inputs test_cassandra
由于 Pulsar IO 框架在 Pulsar Functions 上運行,因此可以通過更新參數和配置來動態更新 source 或 sink。例如,當希望利用前面提到的 Twitter firehose source 將數據輸入到另一個 Pulsar topic 時,可以執行以下命令:
$ ./bin/pulsar-admin source update \--tenant test \ --namespace ns1 \ --name twitter-source \ --destinationTopicName twitter_data_2 \ --sourceConfigFile examples/twitter.yml \ --source-type twitter
?
也可以使用同樣格式的命令更新 sink。大多數 source 和 sink 的更新都可以在運行時進行配置,從而簡化修改、測試、部署等流程。
如果要自定義實現一個小眾的用例,則可以通過實現一個簡單的界面來創建 source 或 sink。但是,Pulsar IO 的目的是幫助用戶直接使用現有的內置 source 或 sink,而不必自己手動實現 source 或 sink。
???? 實現自定義 source
要創建自定義 source,用戶需要編寫一個實現 source 接口的 Java 類:
public interface Source<T> extends AutoCloseable {/** * Open source with configuration * * @param config initialization config * @throws Exception IO type exceptions when opening a connector */ void open(final Map<String, Object> config) throws Exception; /** * Reads the next message from source. * If source does not have any new messages, this call should block. * @return next message from source. The return result should never be null * @throws Exception */ Record<T> read() throws Exception;}
這是一個 source 實現的簡單示例:
public class TestSource implements Source<Integer> { private int i = 0; @Override public void open(Map<String, Object> config) throws Exception { } @Override public Record<Integer> read() throws Exception { return () -> i++; } @Override public void close() throws Exception { }}
在上面的 source 示例中,單調遞增的整數被傳入到 Pulsar。
實現 “Record” 接口的對象需要通過 “read” 方法返回,因為 “Record” 接口包含可用于實現不同消息傳遞語義或保證的字段,例如 exactly-once/effectively-once。在后續文章中,我將詳細討論如何執行此操作。
???? 實現自定義 sink
要創建自定義 sink,用戶需要編寫一個實現 sink 接口的 Java 類:
public interface Sink<T> extends AutoCloseable{ /** * Open Sink with configuration * * @param config initialization config * @throws Exception IO type exceptions when opening a connector */ void open(final Map<String, Object> config) throws Exception; /** * Write a message to Sink * @param inputRecordContext Context of value * @param value value to write to sink * @throws Exception */ void write(RecordContext inputRecordContext, T value) throws Exception;}
例如,一個簡單的 sink 實現:
public class TestSink implements Sink<String> {private static final String FILENAME = "/tmp/test-out";private BufferedWriter bw = null;private FileWriter fw = null;@Overridepublic void open(Map<String, Object> config) throws Exception { File file = new File(FILENAME);// if file doesnt exists, then create itif (!file.exists()) { file.createNewFile(); } fw = new FileWriter(file.getAbsoluteFile(), true); bw = new BufferedWriter(fw); }@Overridepublic void write(RecordContext inputRecordContext, String value) throws Exception {try { bw.write(value); bw.flush(); } catch (IOException e) {throw new RuntimeException(e); } }@Overridepublic void close() throws Exception {try {if (bw != null) bw.close();if (fw != null) fw.close(); } catch (IOException ex) { ex.printStackTrace(); } }}
以上示例說明 sink 如何從 Pulsar 讀取數據并寫入文件。與 source 接口類似,sink 接口中的 “write” 方法有一個 RecordContext 參數。此參數為 sink 提供需要寫入外部系統的值的 context。
RecordContext 參數可用于實現能夠提供不同級別的消息傳遞語義或保證(如:Exactly-once/Effective-once)的 sink。在后續文章中,我們將對此進行更深入的討論。
用戶可以通過類似于運行內置 source 和 sink 的方式來提交自定義 source 和 sink:
$ ./bin/pulsar-admin source create \ --className <classname> \ --jar <jar-location> \ --tenant <tenant> \ --namespace <namespace> \ --name <source-name> \ --destinationTopicName <output-topic>
命令示例如下:
$ ./bin/pulsar-admin source create \ --className org.apache.pulsar.io.twitter.TwitterFireHose \ --jar \~/application.jar \ --tenant test \ --namespace ns1 \ --name twitter-source \ --destinationTopicName twitter_data
在現有 Pulsar 集群中提交待運行的自定義 sink 的命令格式如下:
$ ./bin/pulsar-admin sink create \--className <classname> \--jar <jar-location> \--tenant test \--namespace <namespace> \--name <sink-name> \--inputs <input-topics>
命令示例:
$ ./bin/pulsar-admin sink create \--className org.apache.pulsar.io.cassandra \--jar \~/application.jar \--tenant test \--namespace ns1 \--name cassandra-sink \--inputs test_topic```
如上所述,Pulsar IO 框架在現有的 Pulsar Functions 框架上運行。Pulsar IO 充分利用了現有的 Pulsar Functions 框架。作為 Pulsar IO 的組成部分,source 和 sink 擁有 Pulsar Functions 的所有優勢:
關于Pulsar IO就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。