您好,登錄后才能下訂單哦!
Avro是個支持多語言的數據序列化框架,支持c,c++,c#,python,java,php,ruby,java。他的誕生主要是為了彌補Writable只支持java語言的缺陷。
1 AVRO簡介
很多人會問類似的框架還有Thrift和Protocol,那為什么不使用這些框架,而要重新建一個框架呢,或者說Avro有哪些不同。首先,Avro和其他框架一樣,數據是用與語言無關的schema描述的,不同的是Avro的代碼生成是可選的,schema和數據存放在一起,而schema使得整個數據的處理過程并不生成代碼、靜態數據類型等,為了實現這些,需要假設讀取數據的時候模式是已知的,這樣就會產生緊耦合的編碼,不再需要用戶指定字段標識。
Avro的schema是JSON格式的,而編碼后的數據是二進制格式(當然還有其他可選項)的,這樣對于已經擁有JSON庫的語言可以容易實現。
Avro還支持擴展,寫的schema和讀的schema不一定要是同一個,也就是說兼容新舊schema和新舊客戶端的讀取,比如新的schema增加了一個字段,新舊客戶端都能讀舊的數據,新客戶端按新的schema去寫數據,當舊的客戶端讀到新的數據時可以忽略新增的字段。
Avro還支持datafile文件,schema寫在文件開頭的元數據描述符里,Avro datafile支持壓縮和分割,這就意味著可以做Mapreduce的輸入。
2 Avro Schemas
2.1 Schema 定義
Schema是JSON格式的,包括下面三種形式:
1.JSON string類型,主要是原生類型
2.JSON 數組,主要是union
3.JSON 對象,格式:
{"type": "typeName" ...attributes...}
包括除原生類型和union以外的其他類型,attributes可以包括avro未定義的屬性,這些屬性并不會影響數據的序列化。
2.2 原生類型
總共8種原生類型null,boolean,int,long,float,double,bytes,strings.
1.原生類型不需要attributes
2.可以通過type指定“string” 和 {"type":"string"}是等同的
3.不同語言的實現是不同的,比如double類型,在C,C++和java里就是double,而在Python里是float,在Ruby里是Float.
2.3 復合類型
1、records
records一般是序列化數據的最終展現單元,而且可以自己嵌套。
{ "type":"record", "name":"LongList", "aliases":["LinkedLongs"], "fields" : [ {"name":"value", "type": "long"}, {"name":"next", "type": ["LongList", "null"]} ] }
2、enums,枚舉。
{ "type": "enum", "name":"Suit", "symbols" :["SPADES", "HEARTS", "DIAMONDS","CLUBS"] }
3、arrays,數組。
{"type": "array", "items":"string"}
4、maps
map,keys必須是string,所以這里只指定了values的類型
{"type": "map", "values": "long"}
5、unions
不能包含兩個或者兩個以上沒有name屬性的相同類型
["string", "null"]
6、fixed
size指定每個值占用多少個字節
{"type": "fixed", "size": 16,"name": "md5"}
2.4 三種mapping
generic mapping
針對一種語言來說可能有不同的mapping,但是所有語言必須支持動態mapping,在處理之前并不知道schema
specific mapping
java和C++都可以事先生成源代碼,比generic mapping有更多domain-oriented的api
reflect mapping
使用反射將avro類型轉換成java類型,但這種mapping比前兩種都慢,故棄用。
3 Avro序列化與反序列化
3.1 準備工作
將一下schema保存成文件StringPair.avsc,放在src/test/resources目錄下。
{ "type":"record", "name":"StringPair", "doc":"A pair ofstrings", "fields":[ {"name":"left","type":"string"}, {"name":"right","type":"string"} ] }
引入最新版本的avro時要主要,最新的avro包為1.7.4,依賴org.codehaus.jackson:jackson-core-asl:1.8.8包,但是maven庫中已經沒有該版本,所以要換成其他版本。
<dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.9</version> </dependency>
如果你用的是1.0.4版本的hadoop(或者其他版本),依賴于jackson-mapper-asl,如果與jackson-core-asl版本不一致就會產生找不到方法等異常你需要入引入相同版本。
<dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.9</version> </dependency>
3.2 generic方式
package com.sweetop.styhadoop; import junit.framework.Assert; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.*; import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; /** * Created with IntelliJ IDEA. * User: lastsweetop * Date: 13-8-5 * Time: 下午7:59 * To change this template use File| Settings | File Templates. */ public class TestGenericMapping { @Test public void test() throwsIOException { //將schema從StringPair.avsc文件中加載 Schema.Parser parser = newSchema.Parser(); Schema schema =parser.parse(getClass().getResourceAsStream("/StringPair.avsc")); //根據schema創建一個record示例 GenericRecord datum = newGenericData.Record(schema); datum.put("left","L"); datum.put("right","R"); ByteArrayOutputStream out =new ByteArrayOutputStream(); //DatumWriter可以將GenericRecord變成edncoder可以理解的類型 DatumWriter<GenericRecord> writer = newGenericDatumWriter<GenericRecord>(schema); //encoder可以將數據寫入流中,binaryEncoder第二個參數是重用的encoder,這里不重用,所用傳空 Encoder encoder =EncoderFactory.get().binaryEncoder(out, null); writer.write(datum,encoder); encoder.flush(); out.close(); DatumReader<GenericRecord> reader=newGenericDatumReader<GenericRecord>(schema); Decoderdecoder=DecoderFactory.get().binaryDecoder(out.toByteArray(),null); GenericRecordresult=reader.read(null,decoder); Assert.assertEquals("L",result.get("left").toString()); Assert.assertEquals("R",result.get("right").toString()); } }
result.get返回的是utf-8格式,需要調用toString方法,才能和字符串一致。
3.3 specific方式
首先使用avro-maven-plugin生成代碼,pom的配置。
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.7.0</version> <executions> <execution> <id>schemas</id> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <includes> <include>StringPair.avsc</include> </includes> <sourceDirectory>src/test/resources</sourceDirectory> <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory> </configuration> </execution> </executions> </plugin>
avro-maven-plugin插件綁定在generate-sources階段,調用mvn generate-sources即可生成源代碼,我們來看下生成的源代碼:
package com.sweetop.styhadoop; /** * Autogenerated by Avro * <p/> * DO NOT EDIT DIRECTLY */ @SuppressWarnings("all") /** A pair of strings */ public class StringPair extendsorg.apache.avro.specific.SpecificRecordBase implementsorg.apache.avro.specific.SpecificRecord { public static finalorg.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"StringPair\",\"doc\":\"Apair ofstrings\",\"fields\":[{\"name\":\"left\",\"type\":\"string\",\"avro.java.string\":\"String\"},{\"name\":\"right\",\"type\":\"string\"}]}"); @Deprecated public java.lang.CharSequence left; @Deprecated public java.lang.CharSequenceright; public org.apache.avro.SchemagetSchema() { return SCHEMA$; } // Used by DatumWriter. Applications should not call. public java.lang.Object get(intfield$) { switch (field$) { case 0: return left; case 1: return right; default: throw neworg.apache.avro.AvroRuntimeException("Bad index"); } } // Used by DatumReader. Applications should not call. @SuppressWarnings(value ="unchecked") public void put(int field$,java.lang.Object value$) { switch (field$) { case 0: left =(java.lang.CharSequence) value$; break; case 1: right =(java.lang.CharSequence) value$; break; default: throw neworg.apache.avro.AvroRuntimeException("Bad index"); } } /** * Gets the value of the 'left'field. */ public java.lang.CharSequencegetLeft() { return left; } /** * Sets the value of the 'left'field. * * @param value the value toset. */ public voidsetLeft(java.lang.CharSequence value) { this.left = value; } /** * Gets the value of the 'right'field. */ public java.lang.CharSequencegetRight() { return right; } /** * Sets the value of the 'right'field. * * @param value the value toset. */ public voidsetRight(java.lang.CharSequence value) { this.right = value; } }
為了兼容之前的版本生成了一組get,put方法,1.6.0后生成添加了getter/setter方法,還有一個與Builder的類,沒什么用已經被我刪掉
schama里的name里可以使用命名空間,如com.sweetop.styhadoop.StringPair,這樣生成的源代碼才會是帶package的。
那我們來看如果使用這個生成的類,和generic方式有什么不同:
package com.sweetop.styhadoop; import junit.framework.Assert; import org.apache.avro.Schema; import org.apache.avro.io.*; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * Created with IntelliJ IDEA. * User: lastsweetop * Date: 13-8-6 * Time: 下午2:19 * To change this template use File| Settings | File Templates. */ public class TestSprecificMapping { @Test public void test() throwsIOException { //因為已經生成StringPair的源代碼,所以不再使用schema了,直接調用setter和getter即可 StringPair datum=newStringPair(); datum.setLeft("L"); datum.setRight("R"); ByteArrayOutputStreamout=new ByteArrayOutputStream(); //不再需要傳schema了,直接用StringPair作為范型和參數, DatumWriter<StringPair> writer=newSpecificDatumWriter<StringPair>(StringPair.class); Encoder encoder=EncoderFactory.get().binaryEncoder(out,null); writer.write(datum,encoder); encoder.flush(); out.close(); DatumReader<StringPair> reader=newSpecificDatumReader<StringPair>(StringPair.class); Decoder decoder=DecoderFactory.get().binaryDecoder(out.toByteArray(),null); StringPairresult=reader.read(null,decoder); Assert.assertEquals("L",result.getLeft().toString()); Assert.assertEquals("R",result.getRight().toString()); } }
同點總結一下:
schema->StringPair.class, GenericRecord->StringPair。
4 AvroDatafile
4.1 datafile組成
datafile的組成如下圖:
datafile分為文件頭是數據塊,如果看圖還是不明白,那么看這個應該會很清楚,datafile文件頭的schema:
{"type": "record", "name":"org.apache.avro.file.Header", "fields" : [ {"name":"magic", "type": {"type": "fixed","name": "Magic", "size": 4}}, {"name":"meta", "type": {"type": "map","values": "bytes"}}, {"name":"sync", "type": {"type": "fixed","name": "Sync", "size": 16}}, ] }
要注意的是16字節的同步標記,這個標記意味著datafile支持隨機讀,并且可以做分割,也意味著可以作為mapreduce的輸入。
DataFileReader可以通過同步標記去隨機讀datafile文件。
void seek(long position) Move to a specific, known synchronization point, one returned fromDataFileWriter.sync() while writing. void sync(long position) Move to the next synchronization point after a position.
4.2 datafile寫操作
以代碼注釋的方式進行講解:
//首先創建一個擴展名為avro的文件(擴展名隨意,這里只是為了容易分辨) File file = new File("data.avro"); //這行和前篇文章的代碼一致,創建一個Generic Record的datum寫入類 DatumWriter<GenericRecord> writer = newGenericDatumWriter<GenericRecord>(schema); //和Encoder不同,DataFileWriter可以將avro數據寫入到文件中 DataFileWriter<GenericRecord>dataFileWriter = new DataFileWriter<GenericRecord>(writer); //創建文件,并且寫入頭信息 dataFileWriter.create(schema,file); //寫datum數據 dataFileWriter.append(datum); dataFileWriter.append(datum); dataFileWriter.close();
4.3 datafile讀操作
以代碼注釋的方式進行講解:
//這行也和前篇文章相同,Generic Record的datum讀取類,有點不一樣的就是這里不需要再傳入schema,因為schema已經包含在datafile的頭信息里:
DatumReader<GenericRecord> reader=newGenericDatumReader<GenericRecord>(); //datafile文件的讀取類,指定文件和datumreader DataFileReader<GenericRecord>dataFileReader=new DataFileReader<GenericRecord>(file,reader); //測試下讀寫的schema是否一致 Assert.assertEquals(schema,dataFileReader.getSchema()); //遍歷GenericRecord for (GenericRecord record : dataFileReader){ System.out.println("left="+record.get("left")+",right="+record.get("right")); }
5 Avro schema兼容
5.1 兼容條件
在實際的應用中,因為應用版本的問題經常遇到讀和寫的schema不相同的情況,幸運的是avro已經提供了相關的解決方案。
下面圖示說明:
5.2 Record兼容
在hadoop的實際應用中,更多是以record的形式進行交互,接下來我們重點講解下record的兼容。
首先從讀寫schema的角度取考慮,讀寫schema的不同無外乎就兩種,讀的schema比寫的schema多了一個field,讀的schema比寫的schema少了一個field,這兩種情況處理起來都很簡單。
先看下寫的schema:
{ "type":"record", "name":"com.sweetop.styhadoop.StringPair", "doc":"A pair ofstrings", "fields":[ {"name":"left","type":"string"}, {"name":"right","type":"string"} ] }
1、增加了field的情況
增加了field后的schema:
{ "type":"record", "name":"com.sweetop.styhadoop.StringPair", "doc":"A pair ofstrings", "fields":[ {"name":"left","type":"string"}, {"name":"right","type":"string"}, {"name":"description","type":"string","default":""} ] }
用增加了field的schema取讀數據。
new GenericDatumReader<GenericRecord>(null, newSchema),第一個參數為寫的schema,第二個參數為讀的schema,
由于讀的是avro datafile,schema已經在文件的頭部指定,所以寫的schema可以忽略掉。
@Test public void testAddField()throws IOException { //將schema從newStringPair.avsc文件中加載 Schema.Parser parser = newSchema.Parser(); Schema newSchema =parser.parse(getClass().getResourceAsStream("/addStringPair.avsc")); File file = new File("data.avro"); DatumReader<GenericRecord> reader = newGenericDatumReader<GenericRecord>(null, newSchema); DataFileReader<GenericRecord> dataFileReader = newDataFileReader<GenericRecord>(file, reader); for (GenericRecord record :dataFileReader) { System.out.println("left=" + record.get("left") +",right=" + record.get("right") + ",description=" +record.get("description")); } }
輸出結果為:
left=L,right=R,description= left=L,right=R,description=
description用默認值空字符串代替。
2、減少了field的情況
減少了field的schema:
{ "type":"record", "name":"com.sweetop.styhadoop.StringPair", "doc":"A pair ofstrings", "fields":[ {"name":"left","type":"string"} ] }
用減少了field的schema取讀取:
@Test public void testRemoveField()throws IOException { //將schema從StringPair.avsc文件中加載 Schema.Parser parser = newSchema.Parser(); Schema newSchema = parser.parse(getClass().getResourceAsStream("/removeStringPair.avsc")); File file = newFile("data.avro"); DatumReader<GenericRecord> reader = newGenericDatumReader<GenericRecord>(null, newSchema); DataFileReader<GenericRecord> dataFileReader = newDataFileReader<GenericRecord>(file, reader); for (GenericRecord record :dataFileReader) { System.out.println("left=" + record.get("left")); } }
輸出結果為:
left=L left=L
刪除的field被忽略掉。
3、新舊版本schema
如果從新舊版本的角度取考慮。
新版本schema比舊版本schema增加了一個字段
1.新版本取讀舊版本的數據,使用新版本schema里新增field的默認值
2.舊版本讀新版本的數據,新版本schema里新增field被舊版本的忽略掉
新版本schema比舊版半schema較少了一個字段
1.新版本讀舊版本的數據,減少的field被新版本忽略掉
2.舊版本讀新版本的數據,舊版本的schema使用起被刪除field的默認值,如果沒有就會報錯,那么升級舊版本。
5.3 別名
別名是另一個用于schema兼容的方法,可以將寫的schema的field名字轉換成讀的schema的field,記住并不是加了aliases字段。
而是將寫的filed的name屬性變為aliases,讀的時候只認name屬性。
來看下加了別名的schema:
{ "type":"record", "name":"com.sweetop.styhadoop.StringPair", "doc":"A pair ofstrings", "fields":[ {"name":"first","type":"string","aliases":["left"]}, {"name":"second","type":"string","aliases":["right"]} ] }
使用別名schema去讀數據,這里不能再用left,right,而要用first,second:
@Test public void testAliasesField()throws IOException { //將schema從StringPair.avsc文件中加載 Schema.Parser parser = newSchema.Parser(); Schema newSchema =parser.parse(getClass().getResourceAsStream("/aliasesStringPair.avsc")); File file = newFile("data.avro"); DatumReader<GenericRecord> reader = newGenericDatumReader<GenericRecord>(null, newSchema); DataFileReader<GenericRecord>dataFileReader = new DataFileReader<GenericRecord>(file, reader); for (GenericRecord record :dataFileReader) { System.out.println("first=" +record.get("first")+",second="+record.get("second")); } }
輸出結果為:
first=L,second=R first=L,second=R
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。