91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

kafka+storm+hbase如何實現計算WordCount

發布時間:2021-12-09 10:25:16 來源:億速云 閱讀:124 作者:小新 欄目:大數據

這篇文章主要介紹了kafka+storm+hbase如何實現計算WordCount,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

kafka+storm+hbase實現計算WordCount。

(1)表名:wc

(2)列族:result

(3)RowKey:word

(4)Field:count

1、解決:

1)第一步:首先準備kafkastormhbase相關jar包。依賴如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com</groupId>

  <artifactId>kafkaSpout</artifactId>

  <version>0.0.1-SNAPSHOT</version>

   

    <dependencies>

        <dependency>

            <groupId>org.apache.storm</groupId>

            <artifactId>storm-core</artifactId>

            <version>0.9.3</version>

        </dependency>

        <dependency>

            <groupId>org.apache.storm</groupId>

            <artifactId>storm-kafka</artifactId>

            <version>0.9.3</version>

        </dependency>

        <dependency>

            <groupId>org.apache.kafka</groupId>

            <artifactId>kafka_2.10</artifactId>

            <version>0.8.1.1</version>

            <exclusions>

                <exclusion>

                    <groupId>org.apache.zookeeper</groupId>

                    <artifactId>zookeeper</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>log4j</groupId>

                    <artifactId>log4j</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

        <dependency>

            <groupId>org.apache.hbase</groupId>

            <artifactId>hbase-client</artifactId>

            <version>0.99.2</version>

            <exclusions>

                <exclusion>

                    <groupId>org.slf4j</groupId>

                    <artifactId>slf4j-log4j12</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>org.apache.zookeeper</groupId>

                    <artifactId>zookeeper</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

         

       <dependency>

 

         <groupId>com.google.protobuf</groupId>

 

         <artifactId>protobuf-java</artifactId>

 

         <version>2.5.0</version>

 

        </dependency>

 

        <dependency>

            <groupId>org.apache.curator</groupId>

            <artifactId>curator-framework</artifactId>

            <version>2.5.0</version>

            <exclusions>

                <exclusion>

                    <groupId>log4j</groupId>

                    <artifactId>log4j</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>org.slf4j</groupId>

                    <artifactId>slf4j-log4j12</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

                                                                              

           <dependency>

            <groupId>jdk.tools</groupId>

            <artifactId>jdk.tools</artifactId>

            <version>1.7</version>

            <scope>system</scope>

            <systemPath>C:\Program Files\Java\jdk1.7.0_51\lib\tools.jar</systemPath>

        </dependency>    

         

    </dependencies>

  

    <repositories>

        <repository>

            <id>central</id>

            <url>http://repo1.maven.org/maven2/</url>

            <snapshots>

                <enabled>false</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

        <repository>

            <id>clojars</id>

            <url>https://clojars.org/repo/</url>

            <snapshots>

                <enabled>true</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

        <repository>

            <id>scala-tools</id>

            <url>http://scala-tools.org/repo-releases</url>

            <snapshots>

                <enabled>true</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

        <repository>

            <id>conjars</id>

            <url>http://conjars.org/repo/</url>

            <snapshots>

                <enabled>true</enabled>

            </snapshots>

            <releases>

                <enabled>true</enabled>

            </releases>

        </repository>

    </repositories>

 

    <build>

        <plugins>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-compiler-plugin</artifactId>

                <version>3.1</version>

                <configuration>

                    <source>1.6</source>

                    <target>1.6</target>

                    <encoding>UTF-8</encoding>

                    <showDeprecation>true</showDeprecation>

                    <showWarnings>true</showWarnings>

                </configuration>

            </plugin>

            <plugin>

                <artifactId>maven-assembly-plugin</artifactId>

                <configuration>

                    <descriptorRefs>

                        <descriptorRef>jar-with-dependencies</descriptorRef>

                    </descriptorRefs>

                    <archive>

                        <manifest>

                            <mainClass></mainClass>

                        </manifest>

                    </archive>

                </configuration>

                <executions>

                    <execution>

                        <id>make-assembly</id>

                        <phase>package</phase>

                        <goals>

                            <goal>single</goal>

                        </goals>

                    </execution>

                </executions>

            </plugin>

        </plugins>

    </build>

</project>

 

(2)kafka發來的數據通過levelSplitbolt進行分割處理,然后再發送到下一個Bolt中。代碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

package com.kafka.spout;

 

import java.util.regex.Matcher;

import java.util.regex.Pattern;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

  

public class LevelSplit extends BaseBasicBolt {

  

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String words = tuple.getString(0).toString();//the cow jumped over the moon

        String []va=words.split(" ");

        for(String word : va)

        {

            collector.emit(new Values(word));

        }

         

    }

    

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }

 

}

(3)將levelSplit的Bolt發來的數據到levelCount的Bolt中進行計數處理,然后發送到hbase(Bolt)中。代碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

package com.kafka.spout;

 

import java.util.HashMap;

import java.util.Map;

import java.util.Map.Entry;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

  

public class LevelCount extends BaseBasicBolt {

    Map<String, Integer> counts = new HashMap<String, Integer>();

 

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        // TODO Auto-generated method stub

        String word = tuple.getString(0);

        Integer count = counts.get(word);

        if (count == null)

            count = 0;

        count++;

        counts.put(word, count);

 

        for (Entry<String, Integer> e : counts.entrySet()) {

            //sum += e.getValue();

            System.out.println(e.getKey()

                                "----------->" +e.getValue());

        }

        collector.emit(new Values(word, count));     

    }

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        // TODO Auto-generated method stub

         declarer.declare(new Fields("word""count"));

    }

}

(4)準備連接kafkahbase條件以及設置整個拓撲結構并且提交拓撲。代碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

package com.kafka.spout;

  

import java.util.HashMap;

import java.util.Map;

 

import com.google.common.collect.Maps;

 

//import org.apache.storm.guava.collect.Maps;

  

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

import backtype.storm.utils.Utils;

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.ZkHosts;

   

public class StormKafkaTopo {

    public static void main(String[] args) {

                  

        BrokerHosts brokerHosts = new ZkHosts("zeb,yjd,ylh");

        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "yjd""/storm""kafkaspout");

        Config conf = new Config();  

        spoutConfig.scheme =  new SchemeAsMultiScheme(new MessageScheme());   

         

        SimpleHBaseMapper mapper = new SimpleHBaseMapper();

        mapper.withColumnFamily("result");

        mapper.withColumnFields(new Fields("count"));

        mapper.withRowKeyField("word");

         

        Map<String, Object> map = Maps.newTreeMap();

        map.put("hbase.rootdir""hdfs://zeb:9000/hbase");

        map.put("hbase.zookeeper.quorum""zeb:2181,yjd:2181,ylh:2181");

         

        // hbase-bolt

        HBaseBolt hBaseBolt = new HBaseBolt("wc", mapper).withConfigKey("hbase.conf");

 

        conf.setDebug(true);

        conf.put("hbase.conf", map);

          

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout"new KafkaSpout(spoutConfig));

        builder.setBolt("split"new LevelSplit(), 1).shuffleGrouping("spout");

        builder.setBolt("count"new LevelCount(), 1).fieldsGrouping("split"new Fields("word"));

        builder.setBolt("hbase", hBaseBolt, 1).shuffleGrouping("count");

         

        if(args != null && args.length > 0) {

            //提交到集群運行

            try {

                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

            catch (AlreadyAliveException e) {

                e.printStackTrace();

            catch (InvalidTopologyException e) {

                e.printStackTrace();

            }

        else {

            //本地模式運行

            LocalCluster cluster = new LocalCluster();

            cluster.submitTopology("Topotest1121", conf, builder.createTopology());

            Utils.sleep(1000000);

            cluster.killTopology("Topotest1121");

            cluster.shutdown();

        }          

    }

}

(5)在kafka端用控制臺生產數據,如下:

kafka+storm+hbase如何實現計算WordCount

2、運行結果截圖:

 kafka+storm+hbase如何實現計算WordCount

3、遇到的問題:

(1)把所有的工作做好后,提交了拓撲,運行代碼。發生了錯誤1,如下:

 kafka+storm+hbase如何實現計算WordCount

解決:原來是因為依賴版本要統一的問題,最后將版本修改一致后,成功解決。

(2)發生了錯誤2,如下:

 kafka+storm+hbase如何實現計算WordCount

解決:原來是忘記開hbase中的HMaster和HRegionServer。啟動后問題成功解決。

感謝你能夠認真閱讀完這篇文章,希望小編分享的“kafka+storm+hbase如何實現計算WordCount”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

乌什县| 社旗县| 保山市| 南康市| 共和县| 无极县| 讷河市| 白城市| 武穴市| 南靖县| 绵竹市| 德江县| 团风县| 通榆县| 西盟| 郴州市| 林口县| 水富县| 林西县| 北碚区| 宣武区| 胶南市| 息烽县| 阜平县| 乡宁县| 兴安盟| 南汇区| 上饶县| 崇仁县| 濮阳县| 南城县| 进贤县| 孙吴县| 福海县| 观塘区| 武功县| 威远县| 长汀县| 拜泉县| 仁寿县| 绥中县|