您好,登錄后才能下訂單哦!
小編給大家分享一下如何使用kafka connect將數據批量寫到hdfs,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
kafka-connect是以單節點模式運行,即standalone。
一. 首先,先對kafka和kafka connect做一個簡單的介紹
kafka:Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。比較直觀的解釋就是其有一個生產者(producer)和一個消費者(consumer)。可以將kafka想象成一個數據容器,生產者負責發送數據到這個容器中,而消費者從容器中取出數據,在將數據做處理,如存儲到hdfs。
kafka connect:Kafka Connect是一種用于在Kafka和其他系統之間可擴展的、可靠的流式傳輸數據的工具。它使得能夠快速定義將大量數據集合移入和移出Kafka的連接器變得簡單。即適合批量數據導入導出操作。
二. 下面將介紹如何用kafka connect將數據寫入到hdfs中。包括在這個過程中可能碰到的一些問題說明。
首先啟動kafka-connect:
bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties
這個命令后面兩個參數,
第一個是指定啟動的模式,有分布式和單節點兩種,這里是單節點。kafka自帶,放于config目錄下。
第二個參數指向描述connector的屬性的文件,可以有多個,這里只有一個connector用來寫入到hdfs。需要自己創建。接下來看看connector1.properties的內容,
name="test" #該connector的名字
#將自己按connect接口規范編寫的代碼打包后放在kafka/libs目錄下,再根據項目結構引用對應connector
connector.class=hdfs.HdfsSinkConnector
#Task是導入導出的具體實現,這里是指定多少個task來并行運行導入導出作業,由多線程實現。由于hdfs中一個文件每次只能又一個文件操作,所以這里只能是1
tasks.max=1
#指定從哪個topic讀取數據,這些其實是用來在connector或者task的代碼中讀取的。
topics=test
#指定key以那種方式轉換,需和Producer發送方指定的序列化方式一致
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.json.JsonConverter #同上
hdfs.url=hdfs://127.0.0.1:9000 #hdfs的url路徑,在Connector中會被讀取
hdfs.path=/test/file #hdfs文件路徑,同樣Connector中被讀取
key.converter.schemas.enable=true #稍后介紹,可以true也可以false,影響傳輸格式
value.converter.schemas.enable=true #稍后介紹,可以true也可以false
三. 接下來看代碼,connect主要是導入導出兩個概念,導入是source,導出時Sink。這里只使用Sink,不過Source和Sink的實現其實基本相同。
實現Sink其實不難,實現對應的接口,即SinkConnector和SinkTask兩個接口,再打包放到kafka/libs目錄下即可。其中SinkConnector只有一個,而Task可以有多
先是Connector
public class HdfsSinkConnector extends SinkConnector {
//這兩項為配置hdfs的urlh和路徑的配置項,需要在connector1.properties中指定
public static final String HDFS_URL = "hdfs.url";
public static final String HDFS_PATH = "hdfs.path";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(HDFS_URL, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs url")
.define(HDFS_PATH, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs path");
private String hdfsUrl;
private String hdfsPath;
@Override
public String version() {
return AppInfoParser.getVersion();
}
//start方法會再初始的時候執行一次,這里主要用于配置
@Override
public void start(Map<String, String> props) {
hdfsUrl = props.get(HDFS_URL);
hdfsPath = props.get(HDFS_PATH);
}
//這里指定了Task的類
@Override
public Class<? extends Task> taskClass() {
return HdfsSinkTask.class;
}
//用于配置Task的config,這些都是會在Task中用到
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>();
if (hdfsUrl != null)
config.put(HDFS_URL, hdfsUrl);
if (hdfsPath != null)
config.put(HDFS_PATH, hdfsPath);
configs.add(config);
}
return configs;
}
//關閉時的操作,一般是關閉資源。
@Override
public void stop() {
// Nothing to do since FileStreamSinkConnector has no background monitoring.
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
接下來是Task
public class HdfsSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(HdfsSinkTask.class);
private String filename;
public static String hdfsUrl;
public static String hdfsPath;
private Configuration conf;
private FSDataOutputStream os;
private FileSystem hdfs;
public HdfsSinkTask(){
}
@Override
public String version() {
return new HdfsSinkConnector().version();
}
//Task開始會執行的代碼,可能有多個Task,所以每個Task都會執行一次
@Override
public void start(Map<String, String> props) {
hdfsUrl = props.get(HdfsSinkConnector.HDFS_URL);
hdfsPath = props.get(HdfsSinkConnector.HDFS_PATH);
System.out.println("----------------------------------- start--------------------------------");
conf = new Configuration();
conf.set("fs.defaultFS", hdfsUrl);
//這兩個是與hdfs append相關的設置
conf.setBoolean("dfs.support.append", true);
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
try{
hdfs = FileSystem.get(conf);
// connector.hdfs = new Path(HDFSPATH).getFileSystem(conf);
os = hdfs.append(new Path(hdfsPath));
}catch (IOException e){
System.out.println(e.toString());
}
}
//核心操作,put就是將數據從kafka中取出,存放到其他地方去
@Override
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
log.trace("Writing line to {}: {}", logFilename(), record.value());
try{
System.out.println("write info------------------------" + record.value().toString() + "-----------------");
os.write((record.value().toString()).getBytes("UTF-8"));
os.hsync();
}catch(Exception e){
System.out.print(e.toString());
}
}
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
try{
os.hsync();
}catch (Exception e){
System.out.print(e.toString());
}
}
//同樣是結束時候所執行的代碼,這里用于關閉hdfs資源
@Override
public void stop() {
try {
os.close();
}catch(IOException e){
System.out.println(e.toString());
}
}
private String logFilename() {
return filename == null ? "stdout" : filename;
}
}
這里重點提一下,因為在connector1.propertise中設置了key.converter=org.apache.kafka.connect.converters.ByteArrayConverter,所以不能用命令行形式的
producer發送數據,而是要用程序的方式,并且在producer總也要設置key的序列化形式為org.apache.kafka.common.serialization.ByteArraySerializer。
編碼完成,先用idea以開發程序與依賴包分離的形式打包成jar包,然后將程序對應的jar包(一般就是“項目名.jar”)放到kafka/libs目錄下面,這樣就能被找到。
四. 接下來對這個過程的問題做一個匯總。
1.在connector1.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的問題。
這個選項默認在connect-standalone.properties中是true的,這個時候發送給topic的Json格式是需要使用avro格式,具體情況可以百度,這里給出一個樣例。
{ "schema": { "type": "struct", "fields": [{ "type": "int32", "optional": true, "field": "c1" }, { "type": "string", "optional": true, "field": "c2" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, { "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
主要就是schema和payload這兩個,不按照這個格式會報錯如下
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
如果想發送普通的json格式而不是avro格式的話,很簡單key.converter.schemas.enable和value.converter.schemas.enable設置為false就行。這樣就能發送普通的json格式數據。
2.在啟動的過程中出現各種各樣的java.lang.ClassNotFoundException。
在啟動connector的時候,一開始總是會報各個各樣的ClassNotFoundException,不是這個包就是那個包,查找問題一直說要么缺少包要么是包沖突。這個是什么原因呢?
其實歸根結底還是依賴沖突的問題,因為kafka程序自定義的類加載器加載類的目錄是在kafka/libs中,而寫到hdfs需要hadoop的包。
我一開始的做法是將hadoop下的包路徑添加到CLASSPATH中,這樣子問題就來了,因為kafka和hadoop的依賴包是有沖突的,比如hadoop是guava-11.0.2.jar,而kafka是guava-20.0.jar,兩個jar包版本不同,而我們是在kafka程序中調用hdfs,所以當jar包沖突時應該優先調用kafka的。但是注意kafka用的是程序自定義的類加載器,其優先級是低于CLASSPATH路徑下的類的,就是說加載類時會優先加載CLASSPATH下的類。這樣子就有問題了。
我的解決方案時將kafka和hadoop加載的jar包路徑都添加到CLASSPATH中,并且kafka的路徑寫在hadoop前面,這樣就可以啟動connector成功。
以上是“如何使用kafka connect將數據批量寫到hdfs”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。