您好,登錄后才能下訂單哦!
小編給大家分享一下如何實現基于Flink實時數據處理,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
基于Flink 1.11的網絡流量實時解析,主要針對基于Pcap的原始網絡流量數據進行TCP/UDP/ICMP的協議數據實時解析,并將解析數據裝成數據幀Frame,以便進行實時網絡流量分析。
為完成以上功能,需要了解Pcap數據解析、TCP/UDP層協議解析、Flink的序列化和反序列化、Flink自定義函數以及基于Stream sql的Flink實時數據分析。
要進行基于Pcap格式的網絡流量數據解析,就必須了解Pcap文件格式定義:
如上所示,標準Pcap數據由Pcap文件頭、數據楨Frame頭、數據楨Frame組成。
在Pcap文件頭中:Magic :0x1A2B3C 4D,用于表示Pcap數據的開始;Major:用于標示Pcap數據主版本號;Minor:用于標示Pcap數據次版本號;ThisZone:本地標準時間;SigFigs: 時間戳精度;SnapLen:最大的存儲長度;LinkType:鏈路類型。
在數據楨頭中:Timestamp1:時間戳高位,精確到S;Timestamp2:時間戳低位,精確到ms;CapLen:當前數據楨長度;
Len:網絡中實際數據楨的長度。
注意:目前LinkType鏈路類型,支持EN10MB、RAW、LOOP、LINUX_SLI;通過以上基本結構,在Pcap文件頭中,我們獲取最有用的信息即時LinkType,后面我們需要根據不同的LinkType類型,進行數據楨Frame的解析。
除此之外,根據數據楨頭,可以獲得數據楨的封裝時間;
這里根據以太網數據楨類型為例:也就是Ipv4、Ipv6、ARP數據楨,如上圖所示,該類型的數據楨數據部分的偏移是14。如果是Ipv4或者Ipv6的協議類型,可以解析獲取Mac地址。接下來,其實就是解析TCP/IP層的協議。
// 獲取TCP頭大小tcpOrUdpHeaderSize = getTcpHeaderLength(packetData, ipStart + ipHeaderLen);packet.put(Packet.TCP_HEADER_LENGTH, tcpOrUdpHeaderSize);// Store the sequence and acknowledgement numbers --M// 獲取TCP 請求序列號packet.put(Packet.TCP_SEQ, PcapReaderUtil.convertUnsignedInt(packetData, ipStart + ipHeaderLen + PROTOCOL_HEADER_TCP_SEQ_OFFSET));// 獲取TCP 確認序列號packet.put(Packet.TCP_ACK, PcapReaderUtil.convertUnsignedInt(packetData, ipStart + ipHeaderLen + PROTOCOL_HEADER_TCP_ACK_OFFSET));// Flags stretch two bytes starting at the TCP header offsetint flags = PcapReaderUtil.convertShort(new byte[] { packetData[ipStart + ipHeaderLen + TCP_HEADER_DATA_OFFSET],packetData[ipStart + ipHeaderLen + TCP_HEADER_DATA_OFFSET + 1] })& 0x1FF; // Filter first 7 bits. First 4 are the data offset and the other 3 reserved for future use.packet.put(Packet.TCP_FLAG_NS, (flags & 0x100) == 0 ? false : true);packet.put(Packet.TCP_FLAG_CWR, (flags & 0x80) == 0 ? false : true);packet.put(Packet.TCP_FLAG_ECE, (flags & 0x40) == 0 ? false : true);packet.put(Packet.TCP_FLAG_URG, (flags & 0x20) == 0 ? false : true);packet.put(Packet.TCP_FLAG_ACK, (flags & 0x10) == 0 ? false : true);packet.put(Packet.TCP_FLAG_PSH, (flags & 0x8) == 0 ? false : true);packet.put(Packet.TCP_FLAG_RST, (flags & 0x4) == 0 ? false : true);packet.put(Packet.TCP_FLAG_SYN, (flags & 0x2) == 0 ? false : true);packet.put(Packet.TCP_FLAG_FIN, (flags & 0x1) == 0 ? false : true);
tcpOrUdpHeaderSize = UDP_HEADER_SIZE;if (ipProtocolHeaderVersion == 4) {int cksum = getUdpChecksum(packetData, ipStart, ipHeaderLen);if (cksum >= 0)packet.put(Packet.UDP_SUM, cksum);}int udpLen = getUdpLength(packetData, ipStart, ipHeaderLen);packet.put(Packet.UDP_LENGTH, udpLen);
基于分布式消息隊列Kafka作為網絡流量數據的中間臨時緩存,通過FlinkKafkaConsumer進行網絡流數據的解析,這里我們自定義了PcapResover的解析器,使用自定義的解序列化函數PcapDataDeSerializer。
Kafka Producer,負責轉發已采集的網絡流量,這里配置使用了Kafka內部的序列化類
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());this.consumer = new FlinkKafkaConsumer<>(this.topic,(KafkaDeserializationSchema)new PcapDataDeSerializer(Object.class),props);public class PcapDataDeSerializer implements KafkaDeserializationSchema<Object> {private static final Logger log= LoggerFactory.getLogger(PcapDataDeSerializer.class);private static final long serialVersionUID = 1L;private Class<Object> clazz;public PcapDataDeSerializer(Class<Object> clazz) {this.clazz=clazz;}List<Packet> packetList = new ArrayList<>();@Overridepublic boolean isEndOfStream(Object nextElement) {return false;}@Overridepublic Object deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));DataInputStream dataInputStream=new DataInputStream((InputStream) in);PcapReader reader = new PcapReader(dataInputStream);for (Packet packet : reader) {packetList.add(packet);}log.info("finish deserialize pcap data ,"+record.key()+" , topic is "+record.topic()+", "+"partition is "+record.partition()+" , "+" offset is " +record.offset());return JSON.toJSON(packetList);}@Overridepublic TypeInformation<Object> getProducedType() {return TypeExtractor.getForClass(this.clazz);}}
PcapDataDeSerializer主要實現KafkaDeserializationSchema<Object>中的deserialze即可,在這個函數中,會解析網絡流量,并解析的網絡流量封裝成Pcaket List對象中,進行返回。
KafkaConsumer的創建使用自定義解序列化函數,主要是為了根據1、2 部分對于Pcap網絡流量格式的分析,解析網絡流量,并封裝成數據楨。
基于以上創建的FlinkKafkaConsumer,可以配置Flink Stream DAG,DataStreamSouce ->flatMap->Map->Stream<Frame>
DataStreamSource<Object> stream =executionEnvironment.addSource(this.consumer);log.info("start to build pcap dataStream DAG graph , transform packet into frame stream, " +"and default parallelism is 4 !");return stream.flatMap(new FrameFlatMap()).map(new FrameMapFunction()).setParallelism(4);
這里其實返回的是DataStream<Frame>,也就是說,我們將原始網絡流量解析,最后按照數據楨的方式輸出數據流,以便與進行數據分析。接下來,為了基于Stream sql做一些數據分析,其實就可以將DataStream注冊成臨時表視圖,然后使用類sql的語法進行實時分析了。
聚合統計10s的窗口內,目的mac地址的計數。當然這里sql的表達方式很多,而且表達能力足夠強大。可以根據不同的業務訴求,進行不同的分析。
aggregationSql = "select dstMac,count(1) as c from " + KafkaProperties.FRAME_VIEW_NAME +" group by tumble(PROCTIME() ,interval '10' SECOND) " +", dstMac";
之后就是進行sink了,完成DAG 構建完成,Excute提交任務到集群。
Table result = streamTableEnvironment.sqlQuery(sql);DataStream<Row> resultData = streamTableEnvironment.toAppendStream(result, Row.class);resultData.print();
總結一下,基本流程如下圖所示:
主要通過配置FlinkKafkaConsumer,實現PcapDataDesrializer負責對Pcap數據包中的Frame進行反序列化處理和解析,形式基于Frame的流數據,之后通過自定義FlatMapFunction、MapFunction函數對流數據進行處理和封裝成為原始派生流DataStream<Frame>。
以上是“如何實現基于Flink實時數據處理”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。