您好,登錄后才能下訂單哦!
前言
最近有一項需求,要定時判斷任務執行條件是否滿足并觸發 Spark 任務,平時編寫 Spark 任務時都是封裝為一個 Jar 包,然后采用 Shell 腳本形式傳入所需參數執行,考慮到本次判斷條件邏輯復雜,只用 Shell 腳本完成不利于開發測試,所以調研使用了 Python 和 Java 分別調用 Spark 腳本的方法。
使用版本為 Python 3.6.4 及 JDK 8
Python
主要使用 subprocess 庫。Python 的 API 變動比較頻繁,在 3.5 之后新增了 run 方法,這大大降低了使用難度和遇見 Bug 的概率。
subprocess.run(["ls", "-l"]) subprocess.run(["sh", "/path/to/your/script.sh", "arg1", "arg2"])
為什么說使用 run 方法可以降低遇見 Bug 的概率呢?
在沒有 run 方法之前,我們一般調用其他的高級方法,即 Older high-level API,比如 call,check_all,或者直接創建 Popen 對象。因為默認的輸出是 console,這時如果對 API 不熟悉或者沒有仔細看 doc,想要等待子進程運行完畢并獲取輸出,使用了 stdout = PIPE
再加上 wait 的話,當輸出內容很多時會導致 Buffer 寫滿,進程就一直等待讀取,形成死鎖。在一次將 Spark 的 log 輸出到 console 時,就遇到了這種奇怪的現象,下邊的腳本可以模擬:
# a.sh for i in {0..9999}; do echo '***************************************************' done
p = subprocess.Popen(['sh', 'a.sh'], stdout=subprocess.PIPE) p.wait()
而 call 則在方法內部直接調用了 wait 產生相同的效果。
要避免死鎖,則必須在 wait 方法調用之前自行處理掉輸入輸出,或者使用推薦的 communicate 方法。 communicate 方法是在內部生成了讀取線程分別讀取 stdout stderr,從而避免了 Buffer 寫滿。而之前提到的新的 run 方法,就是在內部調用了 communicate。
stdout, stderr = process.communicate(input, timeout=timeout)
Java
說完了 Python,Java 就簡單多了。
Java 一般使用 Runtime.getRuntime().exec()
或者 ProcessBuilder 調用外部腳本:
Process p = Runtime.getRuntime().exec(new String[]{"ls", "-al"}); Scanner sc = new Scanner(p.getInputStream()); while (sc.hasNextLine()) { System.out.println(sc.nextLine()); } // or Process p = new ProcessBuilder("sh", "a.sh").start(); p.waitFor(); // dead lock
需要注意的是:這里 stream 的方向是相對于主程序的,所以 getInputStream()
就是子進程的輸出,而 getOutputStream()
是子進程的輸入。
基于同樣的 Buffer 原因,假如調用了 waitFor 方法等待子進程執行完畢而沒有及時處理輸出的話,就會造成死鎖。
由于 Java API 很少變動,所以沒有像 Python 那樣提供新的 run 方法,但是開源社區也給出了自己的方案,如commons exec,或 http://www.baeldung.com/run-shell-command-in-java,或 alvin alexander 給出的方案(雖然不完整)。
// commons exec,要想獲取輸出的話,相比 python 來說要復雜一些 CommandLine commandLine = CommandLine.parse("sh a.sh"); ByteArrayOutputStream out = new ByteArrayOutputStream(); PumpStreamHandler streamHandler = new PumpStreamHandler(out); Executor executor = new DefaultExecutor(); executor.setStreamHandler(streamHandler); executor.execute(commandLine); String output = new String(out.toByteArray());
但其中的思想和 Python 都是統一的,就是在后臺開啟新線程讀取子進程的輸出,防止 Buffer 寫滿。
另一個統一思想的地方就是,都推薦使用數組或 list 將輸入的 shell 命令分隔成多段,這樣的話就由系統來處理空格等特殊字符問題。
參考:
https://dcreager.net/2009/08/06/subprocess-communicate-drawbacks/ https://alvinalexander.com/java/java-exec-processbuilder-process-1 https://www.javaworld.com/article/2071275/core-java/when-runtime-exec—won-t.html
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。