您好,登錄后才能下訂單哦!
這篇文章主要介紹“flink中zk引起的重啟怎么解決”,在日常操作中,相信很多人在flink中zk引起的重啟怎么解決問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”flink中zk引起的重啟怎么解決”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
最近用flink on k8s跑程序的過程中,發現某個時刻經常導致程序重啟,定時任務每天加載一次緩存,該緩存有大量數據,加載時長需要60-90s左右。這個定時任務經常會導致k8s重啟程序,使其極不穩定,于是各種調優。
懷疑可能是算子的sender和receiver之間因為加載緩存導致某種通信不可達,默認的心跳時間是50s,于是修改參數:heartbeat.timeout: 180000,heartbeat.interval: 20000。
jobmanager和taskmanager是用akka通信,修改參數akka.ask.timeout: 240s。
這些操作之后,偶爾還是會在加載緩存的時候發現異常,日志截取如下
2020-10-16 17:05:05,939 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 29068ms for sessionid 0x30135fa8005449f 2020-10-16 17:05:05,948 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 29068ms for sessionid 0x30135fa8005449f, closing socket connection and attempting reconnect 2020-10-16 17:05:07,609 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED 2020-10-16 17:05:07,611 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2020-10-16 17:05:07,612 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 1bb3b7bdcfbc39cf760064ed9736ea80 with leader id bed26e07640e5e79197e468c85354534 lost leadership. 2020-10-16 17:05:07,613 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2020-10-16 17:05:07,614 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 1bb3b7bdcfbc39cf760064ed9736ea80. 2020-10-16 17:05:07,615 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: Custom Source -> Flat Map -> Timestamps/Watermarks (15/15) (052a84a37a0647ab485baa54f149b762). 2020-10-16 17:05:07,615 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (15/15) (052a84a37a0647ab485baa54f149b762) switched from RUNNING to FAILED. org.apache.flink.util.FlinkException: JobManager responsible for 1bb3b7bdcfbc39cf760064ed9736ea80 lost the leadership. at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1274) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:155) at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1698) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Job leader for job id 1bb3b7bdcfbc39cf760064ed9736ea80 lost leadership. ... 22 more
再經過調查發現,這個跟zk有關系,zk在切換leader或者遇到網絡波動之類的,會觸發SUSPENDED狀態,這個狀態,會導致lost the leadership錯誤,而遇到這個錯誤,k8s直接就重啟程序。其實訪問zk還是正常的。 再經過一系列調查,這種問題別人早就遇到,還改了代碼,就是flink官方沒合并代碼。調查的過程不表,有用的鏈接如下
https://www.cnblogs.com/029zz010buct/p/10946244.html
這個有用的是升級curator包, flink用的是2.12.0,暫時沒去操作,里面提到的SessionConnectionStateErrorPolicy是在4.x版本的,應該還是要去編譯部分代碼。
https://github.com/apache/flink/pull/9066 https://issues.apache.org/jira/browse/FLINK-10052
這個是其他人的解決方案,本人用的也是這個方法。 不把SUSPENDED狀態認為是lost leadership,修改LeaderLatch的handleStateChange方法
case RECONNECTED: { try { if (!hasLeadership.get()) { reset(); } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Could not reset leader latch", e); setLeadership(false); } break; } case LOST: { setLeadership(false); break; }
找到這段代碼之后,自然是找到了flink-shaded-hadoop-2-uber-xxx.jar這個包,在flink1.10的版本,還支持hadoop的這個包,在1.11之后已經不再主動支持,需要的要自己去下載,因為這個包在打鏡像時會特意加上去,所以目標鎖定這個包,重新編譯。簡單說下編譯過程
https://github.com/apache/curator/tree/apache-curator-2.12.0 下載這個版本的源碼,修改curator-recipes下的src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java,修改內容如上所示,打的包是2.12.0。
https://github.com/apache/flink-shaded/tree/release-10.0 下載flink-shaded 1.10版本的源碼,修改flink-shaded-hadoop-2-parent的pom文件,增加exclusion,去掉curator-recipes的依賴,增加自己編譯的curator-recipes。觀察到不去掉依賴,默認是2.7.1版本,應該是這塊代碼好多年沒動過,版本一直停留在2.7.1。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> ...省略若干exclusion <exclusion> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
因為我們用的是2.8.3-10.0版本的,源碼是2.4.1的,修改成<hadoop.version>2.8.3</hadoop.version>
看根目錄的readme.md,在flink-shaded-release-10.0/flink-shaded-hadoop-2-parent目錄運行mvn package -Dshade-sources打包,打包完成之后,用工具反編譯觀察一下,SUSPENDED的代碼確實去掉了,重新打鏡像,跑程序。
到此,關于“flink中zk引起的重啟怎么解決”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。