您好,登錄后才能下訂單哦!
Druid的單機版安裝參考:https://blog.51cto.com/10120275/2429912
Druid實時接入Kafka的過程
下載、安裝、啟動kafka過程:
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.11-2.2.1.tgz
tar -zxvf kafka_2.11-2.2.1.tgz
ln -s kafka_2.11-2.2.1 kafka
$KAFKA_HOME/kafka-server-start.sh ~/kafka/config/server.properties 1>/dev/null 2>&1 &
創建topic : wikipedia./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
解壓wikiticker-2015-09-12-sampled.json.gz文件,這個步驟是給kafka topic準備輸入文件
cd $DRUID_HOME/quickstart/tutorial
gunzip -k wikiticker-2015-09-12-sampled.json.gz
這個步驟操作完成后,在$DRUID_HOME/quickstart/tutorial文件夾下生成wikiticker-2015-09-12-sampled.json
上圖配置文件如下,其中bootstrap.servers配置kafka地址
{
"type": "kafka",
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
}
}
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia",
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
接下來要將wikiticker-2015-09-12-sampled.json文件內容,利用kafka生產者腳本寫入wikipedia的topic中
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。