作者:牛肉圓粉不加蔥
本文由 簡書 牛肉圓粉不加蔥 授權發布,版權所有歸作者,轉載請聯系作者!
前文揭開 Spark Streaming神秘面紗③ – 動態生成 job 我們分析了 JobScheduler 是如何動態為每個 batch生成 jobs,本文將說明這些生成的 jobs 是如何被提交的。
在 JobScheduler 生成某個 batch 對應的 Seq[Job] 之后,會將 batch 及 Seq[Job] 封裝成一個 JobSet 對象,JobSet 持有某個 batch 內所有的 jobs,并記錄各個 job 的運行狀態。
之后,調用JobScheduler#submitJobSet(jobSet: JobSet)來提交 jobs,在該函數中,除了一些狀態更新,主要任務就是執行
即,對于 jobSet 中的每一個 job,執行jobExecutor.execute(new JobHandler(job)),要搞懂這行代碼干了什么,就必須了解 JobHandler 及 jobExecutor。
JobHandler
JobHandler 繼承了 Runnable,為了說明與 job 的關系,其精簡后的實現如下:
JobHandler#run 方法主要執行了 job.run(),該方法最終將調用到 揭開Spark Streaming神秘面紗③ – 動態生成 job 中的『生成該 batch 對應的 jobs的Step2 定義的 jobFunc』,jonFunc 將提交對應 RDD DAG 定義的 job。
JobExecutor
知道了 JobHandler 是用來執行 job 的,那么 JobHandler 將在哪里執行 job 呢?
答案是jobExecutor,jobExecutor為 JobScheduler 成員,是一個線程池,在JobScheduler 主構造函數中創建,如下:
JobHandler 將最終在 線程池jobExecutor 的線程中被調用,jobExecutor的線程數可通過spark.streaming.concurrentJobs配置,默認為1。若配置多個線程,就能讓多個 job 同時運行,若只有一個線程,那么同一時刻只能有一個 job 運行。
以上,即 jobs 被執行的邏輯。
End.