您好,登錄后才能下訂單哦!
本篇內容主要講解“如何將數據從Web服務處理到MongoDB中”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“如何將數據從Web服務處理到MongoDB中”吧!
如何創建一個使用Web服務數據并將其插入MongoDB數據庫的Spring Batch應用程序。
閱讀本文的開發人員必須熟悉Spring Batch(示例)和MongoDB。
Mongo數據庫部署在MLab中。請按照本快速入門中的步驟操作。
批處理應用程序部署在Heroku PaaS中。詳情 請看這里。
IDE STS或IntelliJ或Eclipse。
Java 8 JDK。
注意:批處理也可以在本地運行。
全局場景步驟是:
從Web服務讀取數據,在這種情況下:https://sunrise-sunset.org/api
獲取城市列表的坐標,然后調用API以讀取日出和日落日期時間。
2.處理數據并提取業務數據
收集數據的業務處理
3.在MongoDB中插入已處理的數據
將處理過的數據保存為mongo文檔
輸入:本地文件中JSON格式的城市數據列表,如下所示:
[
{
“名字”:“Danemark”,
“城市”:[
{
“名字”:“Copenhague”,
“lat”:55.676098,
“lng”:12.568337,
“timeZone”:“CET”
},
{
“名字”:“奧胡斯”,
“lat”:56.162939,
“lng”:10.203921,
“timeZone”:“CET”
},
{
“名字”:“歐登塞”,
“lat”:55.39594,
“lng”:10.38831,
“timeZone”:“CET”
},
{
“名字”:“奧爾堡”,
“lat”:57.046707,
“lng”:9.935932,
“timeZone”:“CET”
}
]
}
]
我們的場景從本地json文件獲取輸入數據。映射bean如下:
國豆:
導入 java。io。可序列化 ;
導入 java。util。清單 ;
進口 com。fastxml。杰克遜。注釋。JsonIgnoreProperties ;
@JsonIgnoreProperties(ignoreUnknown = true)
public class BCountry 實現 Serializable {
private static final long serialVersionUID = 1L ;
私有 字符串 名稱 ;
私人 名單< BCity > 城市 ;
public BCountry(){
super();
}
public BCountry(String name,List < BCity > cities){
super();
這個。name = name ;
這個。城市 = 城市 ;
}
public BCountry(String name){
super();
這個。name = name ;
}
public String getName(){
返回 名稱 ;
}
public void setName(String name){
這個。name = name ;
}
public List < BCity > getCities(){
返回 城市 ;
}
public void setCities(List < BCity > cities){
這個。城市 = 城市 ;
}
@覆蓋
public int hashCode(){
final int prime = 31 ;
int result = 1 ;
結果 = 黃金 * 結果 +((城市 == 空)? 0:城市。的hashCode());
結果 = 黃金 * 結果 +((名稱 == 空)? 0:名稱。的hashCode());
返回 結果 ;
}
@覆蓋
public boolean equals(Object obj){
if(this == obj)
返回 true ;
if(obj == null)
返回 虛假 ;
如果(的getClass()!= OBJ。的getClass())
返回 虛假 ;
BCountry other =(BCountry)obj ;
if(cities == null){
如果(其他。城市 != 空)
返回 虛假 ;
} 否則 如果(!城市。平等(等。城市))
返回 虛假 ;
if(name == null){
如果(其他。名字 != 空)
返回 虛假 ;
} 否則 如果(!名字。平等(其它。名))
返回 虛假 ;
返回 true ;
}
@覆蓋
public String toString(){
返回 “BCountry [name =” + name + “,cities =” + cities + “]” ;
}
}
和城市豆:
導入 java。io。可序列化 ;
進口 com。fastxml。杰克遜。注釋。JsonIgnoreProperties ;
@JsonIgnoreProperties(ignoreUnknown = true)
公共 類 BCity 實現 Serializable {
private static final long serialVersionUID = 1L ;
private String name,timeZone ;
私人 雙 拉特,lng ;
public BCity(){
super();
}
public BCity(String name,String timeZone,double lat,double lng){
super();
這個。name = name ;
這個。timeZone = timeZone ;
這個。lat = lat ;
這個。lng = lng ;
}
public String getName(){
返回 名稱 ;
}
public void setName(String name){
這個。name = name ;
}
public String getTimeZone(){
返回時 區 ;
}
public void setTimeZone(String timeZone){
這個。timeZone = timeZone ;
}
public double getLat(){
返回 緯度 ;
}
public void setLat(double lat){
這個。lat = lat ;
}
public double getLng(){
返回 lng ;
}
public void setLng(double lng){
這個。lng = lng ;
}
@覆蓋
public String toString(){
返回 “BCity [name =” + name + “,timeZone =” + timeZone + “,lat =” + lat + “,lng =” + lng + “]” ;
}
@覆蓋
public int hashCode(){
final int prime = 31 ;
int result = 1 ;
長 溫度 ;
temp = Double。doubleToLongBits(lat);
result = prime * result +(int)(temp ^(temp >>> 32));
temp = Double。doubleToLongBits(lng);
result = prime * result +(int)(temp ^(temp >>> 32));
結果 = 黃金 * 結果 +((名稱 == 空)? 0:名稱。的hashCode());
結果 = 素 * 結果 +((的timeZone == 空)? 0:的timeZone。的hashCode());
返回 結果 ;
}
@覆蓋
public boolean equals(Object obj){
if(this == obj)
返回 true ;
if(obj == null)
返回 虛假 ;
如果(的getClass()!= OBJ。的getClass())
返回 虛假 ;
BCity other =(BCity)obj ;
如果(雙。doubleToLongBits的(LAT)!= 雙。doubleToLongBits的(其他的。LAT))
返回 虛假 ;
如果(雙。doubleToLongBits的(LNG)!= 雙。doubleToLongBits的(其他的。LNG))
返回 虛假 ;
if(name == null){
如果(其他。名字 != 空)
返回 虛假 ;
} 否則 如果(!名字。平等(其它。名))
返回 虛假 ;
if(timeZone == null){
如果(其他。的timeZone != 空)
返回 虛假 ;
} 否則 如果(!的timeZone。平等(其它。的timeZone))
返回 虛假 ;
返回 true ;
}
}
批量閱讀器實現@LineMapper。您可以使讀者適應我們的數據源(示例):
導入 java。util。清單 ;
進口 組織。彈簧框架。批次。項目。檔案。LineMapper ;
進口 com。ahajri。批次。豆子。BCountry ;
進口 com。fastxml。杰克遜。數據綁定。ObjectMapper ;
進口 com。fastxml。杰克遜。數據綁定。類型。CollectionType ;
公共 類 BCountryJsonLineMapper 實現了 LineMapper < List < BCountry >> {
private final ObjectMapper mapper = new ObjectMapper();
@覆蓋
public List < BCountry > mapLine(String line,int lineNumber)throws Exception {
CollectionType collectionType = mapper。getTypeFactory()。constructCollectionType(列表。類,BCountry。類);
返回 映射器。readValue(line,collectionType);
}
}
處理數據批處理時,檢查同一天某個城市的業務處理數據是否已保存在數據庫中。在MongoDB的搜索數據的方式就是在這個詳細的崗位。
ItemProcessor將@BCountry對象轉換為MongoDB Document對象。該過程詳述如下:
public class BCountryPrayTimeEventItemProcessor 實現 ItemProcessor < List < BCountry >,List < Document >> {
private static final String EVENTS_COLLECTION_NAME = “event” ;
private static final Logger LOG = LoggerFactory。getLogger(BCountryPrayTimeEventItemProcessor。類);
@Autowired
private PrayTimeService prayTimeService ;
@Autowired
private CloudMongoService cloudMongoService ;
@覆蓋
public List < Document > 進程(List < BCountry > items)拋出 Exception {
final List < Document > docs = new ArrayList <>();
物品。stream()。forEach(item - > {
final String countryName = item。getName();
項目。getCities()。stream()。forEach(c - > {
final Document prayTimeCityEventDoc = new Document();
//循環城市并為今天提取祈禱時間
final String cityName = c。getName();
final String cityTimeZone = c。getTimeZone();
final double lat = c。getLat();
final double lng = c。getLng();
final LocalDateTime nowOfCity = LocalDateTime。現在(了zoneid。的(cityTimeZone));
final QueryParam [] queryParams = new QueryParam [ 5 ];
queryParams [ 0 ] = 新 QueryParam(“CITY_NAME” ,OperatorEnum。EQ。名稱(),的cityName);
queryParams [ 1 ] = 新 QueryParam(“EVENT_TYPE” ,OperatorEnum。EQ。名稱(),事件類型。PRAY_TIME。名稱());
queryParams [ 2 ] = 新 QueryParam(“月”,OperatorEnum。EQ。名稱(),nowOfCity。getMonthValue());
queryParams [ 3 ] = 新 QueryParam(“DAY_OF_MONTH” ,OperatorEnum。EQ。名稱(),nowOfCity。getDayOfMonth());
queryParams [ 4 ] = 新 QueryParam(“COUNTRY_NAME” ,OperatorEnum。EQ。名稱(),國家名稱);
List < Document > foundEvents = null ;
嘗試 {
foundEvents = cloudMongoService。搜索(EVENTS_COLLECTION_NAME,queryParams);
} catch(BusinessException e1){
記錄。錯誤(“====>未找到城市祈禱時間” + 的cityName + “對” + nowOfCity。getDayOfMonth()+ “/”
+ nowOfCity。getMonthValue());
}
嘗試 {
如果(CollectionUtils。的isEmpty(foundEvents)){
//祈禱時間尚未創建
prayTimeCityEventDoc。put(“country_name”,countryName);
prayTimeCityEventDoc。put(“city_name”,cityName);
prayTimeCityEventDoc。把(“EVENT_TYPE” ,事件類型。PRAY_TIME。名稱());
prayTimeCityEventDoc。把(“復發”,RecurringEnum。YEARLY。名稱());
prayTimeCityEventDoc。把(“月”,nowOfCity。getMonthValue());
prayTimeCityEventDoc。把(“DAY_OF_MONTH” ,nowOfCity。getDayOfMonth());
prayTimeCityEventDoc。put(“lat”,lat);
prayTimeCityEventDoc。put(“lng”,lng);
prayTimeCityEventDoc。把(“CREATION_DATE” ,HCDateUtils。convertToDateViaSqlTimestamp(nowOfCity));
final Map < String,Object > prayInfos = prayTimeService。getPrayTimeByLatLngDate(lat,lng,
日期。從(nowOfCity。atZone(了zoneid。的(cityTimeZone))。toInstant()),cityTimeZone);
prayTimeCityEventDoc。把(“pray_infos” ,文件。解析(新 GSON()的toJSON(prayInfos)));
docs。add(prayTimeCityEventDoc);
} else {
記錄。信息(字符串。格式(“====>祈禱的時間已經存在的城市:%S,月:%d,日:%d” ,
cityName,nowOfCity。getMonthValue(),nowOfCity。getDayOfMonth()));
}
} catch(BusinessException e){
記錄。錯誤(“計算祈禱時間時出問題:”,e);
拋出 新的 RuntimeException(e);
}
});
});
返回 文檔 ;
}
}
批量配置類:
@組態
@EnableBatchProcessing
@EnableScheduling
公共 類 BatchConfiguration {
private static final String SCANDINAVIAN_COUNTRIES_JSON_FILE = “scandinavian-countries.json” ;
private static final String EVENT_COLLECTION_NAME = “event_collection” ;
private static final Logger LOG = LoggerFactory。getLogger(BatchConfiguration。類);
@Autowired
private JobBuilderFactory jobBuilderFactory ;
@Autowired
private StepBuilderFactory stepBuilderFactory ;
@Autowired
私有的 MLabMongoService mlabMongoService ;
@豆
public ResourcelessTransactionManager transactionManager(){
返回 新的 ResourcelessTransactionManager();
}
@豆
public MapJobRepositoryFactoryBean mapJobRepositoryFactory(ResourcelessTransactionManager txManager)
拋出 異常 {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(txManager);
工廠。afterPropertiesSet();
返回 工廠 ;
}
@豆
public JobRepository jobRepository(MapJobRepositoryFactoryBean factory)拋出 異常 {
return(JobRepository)工廠。getObject();
}
private SimpleJobLauncher jobLauncher ;
@豆
public SimpleJobLauncher jobLauncher(JobRepository jobRepository){
jobLauncher。setJobRepository(jobRepository);
return jobLauncher ;
}
@PostConstruct
private void initJobLauncher(){
jobLauncher = new SimpleJobLauncher();
}
@豆
FlatFileItemReader < List < BCountry >> reader(){
FlatFileItemReader < List < BCountry >> reader = new FlatFileItemReader <>();
讀者。setName(“scandinaviandCountriesReader”);
讀者。setResource(new ClassPathResource(SCANDINAVIAN_COUNTRIES_JSON_FILE));
讀者。setLineMapper(new BCountryJsonLineMapper());
回報 讀者 ;
}
@豆
public ItemWriter < List < Document >> writer(){
返回 新的 ItemWriter < List < Document >>(){
@覆蓋
public void write(List <? extends List < Document >> items)拋出 Exception {
嘗試 {
如果(!CollectionUtils。的isEmpty(項目)&& 項目。大小()> 0){
List < Document > flatDocs = items。stream()。flatMap(List :: stream)。收集(收藏家。toList());
mlabMongoService。insertMany(EVENT_COLLECTION_NAME,flatDocs);
} else {
記錄。警告(“沒有事件可以救......”);
}
} catch(BusinessException e){
拋出 新的 RuntimeException(e);
}
}
};
}
@豆
public BCountryTimeEventItemProcessor processor(){
返回 新的 BCountryTimeEventItemProcessor();
}
@豆
public Job scandvTimeJob(){
返回 jobBuilderFactory。get(“scandvTimeJob”)。incrementmenter(new RunIdIncrementer())。流程(step1())。結束()
。build();
}
@豆
public Step step1(){
返回 stepBuilderFactory。得到(“step1”)。< List < BCountry >,List < Document >> chunk(10)。讀者(讀者())
。處理器(處理器())。作家(作家())。build();
}
// end :: jobstep []
//每天午夜15分鐘
@Scheduled(cron = “0 15 0 * * *”)
public void startScandvEventTimeJob()throws Exception {
記錄。info(“====>工作開始時間:” + 新 日期());
JobParameters param = new JobParametersBuilder()。addString(“作業ID” ,字符串。的valueOf(系統。的currentTimeMillis()))
。toJobParameters();
JobExecution 執行 = jobLauncher。run(scandvPrayTimeJob(),param);
記錄。信息(“====>工作完成了狀態:” + 執行。的getStatus());
}
}
部署de Batch到Heroku:
git add。
git commit -m “Deploy Batch”
git push heroku master
注意:要禁用默認批量啟動,請將此添加到application.yml
s p r i n g:
b a t c h:
j o b:
? ? 一個b 升é d:?F 一升小號?
到此,相信大家對“如何將數據從Web服務處理到MongoDB中”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。