您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何將nutch2.3的bin/crawl腳本改寫為java類,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
nutch2.8以后,以前的主控代碼org.apache.nutch.crawl.Crawl
類沒了,只剩下對應的控制腳本bin/crawl
,感覺在IDEA里面調試不方便,所以我了解了下shell腳本,根據nutch3.3的bin/crawl
和bin/nutch
腳本,把bin/crawl
翻譯成了java的Crawl類以便在IDEA里面調試
我參考了nutch2.7的crawl
類,nutch3.3的bin/crawl
和bin/nutch
,盡量按照shell腳本的原組織結構和邏輯進行翻譯,有些地方不能直接使用的,就稍作了修改。
主要的業務邏輯在public int run(String[] args)
方法里
程序主入口是main
,調用ToolRunner.run(NutchConfiguration.create(), new Crawl(), args);
執行上面的run
方法
public void binNutch5j(String jobName,String commandLine,String options)
相當于bin/crawl
腳本里函數__bin_nutch
的功能
public int runJob(String jobName,String commandLine,String options)
相當于腳本bin/nutch的功能,這里沒有像腳本中那樣用if-else
,也沒有使用switch-case
,而是采用反射創建相應的job
public void preConfig(Configuration conf,String options)
用于根據帶-D
參數 commonOptions等指令設置每個Job的配置項
CLASS_MAP
是靜態(static
)屬性,一個記錄JobName和對應的類名的映射關系的哈希表(HashMap
)
我之前是在每個job是按照腳本使用batchId參數的,遇到了下面這個問題:
Gora MongoDb Exception, can't serialize Utf8
貌似是序列化問題,好像gora-0.6版本解決了這個BUG,但我的nutch代碼是gora-0.5的,不會升級,所以就簡單的把-batchId
參數去掉,使用-all
參數就行了,這點在代碼里可以看到。
關于升級到gora-0.6,有空再研究好了。
通過這個腳本的改寫,我了解了腳本的基本使用,同時對之前看的java反射等知識進行了實踐,并對nutch的完整爬取流程、主要控制邏輯有了深刻的印象。主要是前面那個gora的BUG卡了我幾天,我還以為自己翻譯的有問題,看來調試能力還需要加強。
這段代碼是翻譯nutch3.3的bin/crawl
和bin/nutch
腳本
Crawl
類加到在org.apache.nutch.crawl
包下,源碼如下:
package org.apache.nutch.crawl; /** * Created by brianway on 2016/1/19. * @author brianway * @site brianway.github.io * org.apache.nutch.crawl.Crawl; */ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.fetcher.FetcherJob; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchTool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; import java.util.Random; // Commons Logging imports //import org.apache.hadoop.fs.*; //import org.apache.hadoop.mapred.*; //import org.apache.nutch.util.HadoopFSUtil; //import org.apache.nutch.util.NutchJob; //import org.apache.nutch.crawl.InjectorJob; //import org.apache.nutch.crawl.GeneratorJob; //import org.apache.nutch.fetcher.FetcherJob; //import org.apache.nutch.parse.ParserJob; //import org.apache.nutch.crawl.DbUpdaterJob; //import org.apache.nutch.indexer.IndexingJob; //import org.apache.nutch.indexer.solr.SolrDeleteDuplicates; public class Crawl extends NutchTool implements Tool{ public static final Logger LOG = LoggerFactory.getLogger(Crawl.class); /* Perform complete crawling and indexing (to Solr) given a set of root urls and the -solr parameter respectively. More information and Usage parameters can be found below. */ public static void main(String args[]) throws Exception { int res = ToolRunner.run(NutchConfiguration.create(), new Crawl(), args); System.exit(res); } //為了編譯過 @Override public Map<String, Object> run(Map<String, Object> args) throws Exception { return null; } @Override public int run(String[] args) throws Exception { if (args.length < 1) { System.out.println ("Usage: Crawl -urls <urlDir> -crawlId <crawlID> -solr <solrURL> [-threads n] [-depth i] [-topN N]"); // ("Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>"); return -1; } // ------------check args--------- /* //!!由腳本直譯的,感覺少參數,所以注釋掉,換下面的方式 String seedDir = args[1]; String crawlID = args[2]; String solrUrl=null; int limit=1; if(args.length-1 == 3){ limit = Integer.parseInt(args[3]); }else if(args.length-1 == 4){ solrUrl = args[3]; limit = Integer.parseInt(args[4]); }else{ System.out.println("Unknown # of arguments "+(args.length-1)); System.out.println ("Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>"); return -1; //"Usage: Crawl <urlDir> -solr <solrURL> [-dir d] [-threads n] [-depth i] [-topN N]" //"Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>"; } */ String seedDir = null; String crawlID = null; String solrUrl=null; int limit = 0; long topN = Long.MAX_VALUE; int threads = getConf().getInt("fetcher.threads.fetch", 10); //parameter-format in crawl class is // like nutch2.7 "Usage: Crawl <urlDir> -solr <solrURL> [-dir d] [-threads n] [-depth i] [-topN N]" //not like nutch3.3 "Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>"; for (int i = 0; i < args.length; i++) { if ("-urls".equals(args[i])) { seedDir = args[++i]; } else if ("-crawlId".equals(args[i])) { crawlID = args[++i]; } else if ("-threads".equals(args[i])) { threads = Integer.parseInt(args[++i]); } else if ("-depth".equals(args[i])) { limit = Integer.parseInt(args[++i]); } else if ("-topN".equals(args[i])) { topN = Long.parseLong(args[++i]); } else if ("-solr".equals(args[i])) { solrUrl = args[++i]; i++; } else { System.err.println("Unrecognized arg " + args[i]); return -1; } } if(StringUtils.isEmpty(seedDir)){ System.out.println("Missing seedDir : crawl <seedDir> <crawlID> [<solrURL>] <numberOfRounds>"); return -1; } if(StringUtils.isEmpty(crawlID)){ System.out.println("Missing crawlID : crawl <seedDir> <crawlID> [<solrURL>] <numberOfRounds>"); return -1; } if(StringUtils.isEmpty(solrUrl)){ System.out.println("No SOLRURL specified. Skipping indexing."); } if(limit == 0) { System.out.println("Missing numberOfRounds : crawl <seedDir> <crawlID> [<solrURL>] <numberOfRounds>"); return -1; } /** * MODIFY THE PARAMETERS BELOW TO YOUR NEEDS */ //set the number of slaves nodes int numSlaves = 1; //and the total number of available tasks // sets Hadoop parameter "mapred.reduce.tasks" int numTasks = numSlaves<<1; // number of urls to fetch in one iteration // 250K per task? //!!這里使用topN long sizeFetchlist = topN;//numSlaves *5; // time limit for feching int timeLimitFetch=180; //Adds <days> to the current time to facilitate //crawling urls already fetched sooner then //db.default.fetch.interval. int addDays=0; // note that some of the options listed here could be set in the // corresponding hadoop site xml param file String commonOptions="-D mapred.reduce.tasks="+numTasks+" -D mapred.child.java.opts=-Xmx1000m -D mapred.reduce.tasks.speculative.execution=false -D mapred.map.tasks.speculative.execution=false -D mapred.compress.map.output=true "; preConfig(getConf(),commonOptions); //initial injection System.out.println("Injecting seed URLs"); String inject_args = seedDir+" -crawlId "+crawlID; binNutch5j("inject",inject_args,commonOptions); for(int a=1;a<=limit;a++){ //-----------generating------------- System.out.println("Generating batchId"); String batchId = System.currentTimeMillis()+"-"+new Random().nextInt(32767); System.out.println("Generating a new fetchlist"); String generate_args = "-topN "+ sizeFetchlist +" -noNorm -noFilter -adddays "+addDays+" -crawlId "+crawlID+" -batchId "+batchId; //String generate_options = commonOptions; int res = runJob("generate",generate_args,commonOptions); System.out.println("binNutch5j generate "+generate_args); if(res==0){ }else if(res == 1){ System.out.println("Generate returned 1 (no new segments created)"); System.out.println("Escaping loop: no more URLs to fetch now"); break; }else{ System.out.println("Error running:"); System.out.println("binNutch5j generate "+generate_args); System.out.println("Failed with exit value "+res); return res; } //--------fetching----------- System.out.println("Fetching : "); //String fetch_args = batchId+" -crawlId "+crawlID+" -threads "+threads; String fetch_args = "-all"+" -crawlId "+crawlID+" -threads "+threads; String fetch_options = commonOptions+" -D fetcher.timelimit.mins="+timeLimitFetch; //10 threads binNutch5j("fetch",fetch_args,fetch_options); //----------parsing-------------- // parsing the batch // if(!getConf().getBoolean(FetcherJob.PARSE_KEY, false)){ System.out.println("Parsing : "); //enable the skipping of records for the parsing so that a dodgy document // so that it does not fail the full task //String parse_args = batchId+" -crawlId "+crawlID; String parse_args = "-all"+" -crawlId "+crawlID; String skipRecordsOptions=" -D mapred.skip.attempts.to.start.skipping=2 -D mapred.skip.map.max.skip.records=1"; binNutch5j("parse",parse_args,commonOptions+skipRecordsOptions); } //----------updatedb------------ // updatedb with this batch System.out.println("CrawlDB update for "+crawlID); // String updatedb_args = batchId+" -crawlId "+crawlID; String updatedb_args = "-all"+" -crawlId "+crawlID; binNutch5j("updatedb",updatedb_args,commonOptions); if(!StringUtils.isEmpty(solrUrl)){ System.out.println("Indexing "+ crawlID+ " on SOLR index -> " +solrUrl); String index_args = batchId+" -all -crawlId "+crawlID; String index_options = commonOptions+" -D solr.server.url="+solrUrl; binNutch5j("index",index_args,index_options); System.out.println("SOLR dedup -> "+solrUrl); binNutch5j("solrdedup",solrUrl,commonOptions); }else{ System.out.println("Skipping indexing tasks: no SOLR url provided."); } } return 0; } /** * 相當于bin/crawl的函數__bin_nutch的功能 * @param jobName job * @param commandLine */ public void binNutch5j(String jobName,String commandLine,String options)throws Exception{ int res = runJob(jobName,commandLine,options); if(res!=0) { System.out.println("Error running:"); System.out.println(jobName + " " + commandLine); System.out.println("Error running:"); System.exit(res); } } /** * 相當于腳本bin/nutch的功能 * * @param jobName * @param commandLine * @return */ public int runJob(String jobName,String commandLine,String options)throws Exception{ //這里為了方便,沒有像腳本那樣用多個if-elif語句,也沒有用switch-case,直接用了反射來完成 Configuration conf = NutchConfiguration.create(); if(!StringUtils.isEmpty(options)){ preConfig(conf,options); } String[] args = commandLine.split("\\s+"); String className = CLASS_MAP.get(jobName); Class<?> jobClass = Class.forName(className); Constructor c = jobClass.getConstructor(); Tool job =(Tool) c.newInstance(); System.out.println("---------------runJob: "+jobClass.getName()+"----------------------"); return ToolRunner.run(conf, job, args); } /** * 設置每個job的配置 * @param conf * @param options */ public void preConfig(Configuration conf,String options){ String [] equations = options.split("\\s*-D\\s+"); System.out.println("options:"+options); // i start from 1 not 0, skip the empty string "" for (int i=1;i<equations.length;i++) { String equation = equations[i]; String [] pair = equation.split("="); //System.out.println(pair[0]+":"+pair[1]); conf.set(pair[0],pair[1]); //System.out.println("conf print: "+pair[0]+" "+conf.get(pair[0])); } } /** * the map to store the mapping relations jobName->ClassName */ public static HashMap<String,String> CLASS_MAP = new HashMap<String,String>(); /** * init the CLASS_MAP,refer to "bin/nutch" */ static { CLASS_MAP.put("inject","org.apache.nutch.crawl.InjectorJob"); CLASS_MAP.put("generate","org.apache.nutch.crawl.GeneratorJob"); CLASS_MAP.put("fetch","org.apache.nutch.fetcher.FetcherJob"); CLASS_MAP.put("parse","org.apache.nutch.parse.ParserJob"); CLASS_MAP.put("updatedb","org.apache.nutch.crawl.DbUpdaterJob"); CLASS_MAP.put("readdb","org.apache.nutch.crawl.WebTableReader"); CLASS_MAP.put("elasticindex","org.apache.nutch.indexer.elastic.ElasticIndexerJob"); CLASS_MAP.put("index","org.apache.nutch.indexer.IndexingJob"); CLASS_MAP.put("solrdedup","org.apache.nutch.indexer.solr.SolrDeleteDuplicates"); } }
關于“如何將nutch2.3的bin/crawl腳本改寫為java類”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。