91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Pulsar IO 中怎么調用Schema

發布時間:2021-06-24 15:57:57 來源:億速云 閱讀:186 作者:Leah 欄目:大數據

這篇文章給大家介紹Pulsar IO 中怎么調用Schema ,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

Schema 是一種描述數據的數據   。例如,數據庫中表的信息和字段類型等都是 Schema。Pulsar 對 Schema 也有比較好的支持。    


>>> Schema 簡單應用 <<<  

 
在使用 pub/sub 生產和消費消息時,可以通過以下代碼使用 Schema:  
     
   
   
   
public class SensorReading {              public float temperature;          
             public SensorReading(float temperature) {                  this.temperature = temperature;              }          
             // A no-arg constructor is required              public SensorReading() {              }          
             public float getTemperature() {                  return temperature;              }          
             public void setTemperature(float temperature) {                  this.temperature = temperature;              }          }          Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))              .topic("my-topic")              .create();          Consumer consumer = client.newConsumer(JSONSchema.of(SensorReading.class))                  .topic("my-topic")                  .subscriptionName("my-subscription")                  .subscribe();      


通過以上操作,生產者和消費者可以識別出關于 SensorReading 這個類的含義。這是 Schema 在客戶端的應用,也是比較普遍的使用方法。  

 
前文已經提到,Source 和 Sink 是對 pub/sub 的封裝,因此,Schema 的應用也是基于以上原理。以下為詳細說明。  

 
>>> Source 中的 Schema <<<  

 
在內建的 Sink 中,實現了一個 Consumer,用于接收從 Pulsar 發來的數據。  
     
   
   
   if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {  
    
        schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true);  
    
    } else {  
    
        schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true);  
    
    }


getSerdeClassName    會獲取用戶指定的用于序列化與反序列化的類,通過指定 --   custom-serde-inputs    參數,從而構建真正的 Schema。  
     
   
   
   
case NONE:            return (Schema<T>) Schema.BYTES;          
         case AUTO_CONSUME:          case AUTO:            return (Schema<T>) Schema.AUTO_CONSUME();          
         case STRING:            return (Schema<T>) Schema.STRING;          
         case AVRO:            return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());          
         case JSON:            return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());          
         case KEY_VALUE:            return (Schema<T>)Schema.KV_BYTES();          
         case PROTOBUF:            return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap());          }      


關于Pulsar IO 中怎么調用Schema 就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

普格县| 托克逊县| 湖口县| 永年县| 衡水市| 疏附县| 漳州市| 威宁| 哈尔滨市| 宜良县| 辽宁省| 福安市| 加查县| 保亭| 孙吴县| 阳朔县| 莱西市| 阆中市| 宁南县| 加查县| 庆阳市| 晋中市| 平江县| 胶州市| 扶余县| 邛崃市| 界首市| 蓝山县| 灯塔市| 车致| 科技| 吴堡县| 鱼台县| 偏关县| 溧水县| 报价| 宕昌县| 东乌珠穆沁旗| 北海市| 兴海县| 谢通门县|