91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何分析Spring Batch遠程分區的本地Jar包模式

發布時間:2022-01-17 15:53:31 來源:億速云 閱讀:144 作者:柒染 欄目:大數據

這期內容當中小編將會給大家帶來有關如何分析Spring Batch遠程分區的本地Jar包模式,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

1 前言

Spring Batch遠程分區對于大量數據的處理非常擅長,它的實現有多種方式,如本地Jar包模式MQ模式Kubernetes模式。這三種模式的如下:

(1)本地Jar包模式:分區處理的worker為一個Java進程,從jar包啟動,通過jvm參數和數據庫傳遞參數;官方提供示例代碼。

(2)MQ模式worker是一個常駐進程,ManagerWorker通過消息隊列來傳遞參數;網上有不少相關示例代碼。

(3)Kubernetes模式workerK8s中的PodManager直接啟動Pod來處理;網上并沒有找到任何示例代碼。

下面將通過代碼來講解第一種模式(本地Jar包模式),其它后續再介紹。

如何分析Spring Batch遠程分區的本地Jar包模式

建議先看下面文章了解一下:

Spring Batch入門:通過例子講解Spring Batch入門,優秀的批處理框架

Spring Batch并行處理介紹:大量數據也不在話下,Spring Batch并行處理四種模式初探

2 代碼講解

本文代碼中,ManagerWorker是放在一起的,在同一個項目里,也只會打一個jar包而已;我們通過profile來區別是manager還是worker,也就是通過Spring Profile實現一份代碼,兩份邏輯。實際上也可以拆成兩份代碼,但放一起更方便測試,而且代碼量不大,就沒有必要了。

2.1 項目準備

2.1.1 數據庫

首先我們需要準備一個數據庫,因為ManagerWorker都需要同步狀態到DB上,不能直接使用嵌入式的內存數據庫了,需要一個外部可共同訪問的數據庫。這里我使用的是H2 Database,安裝可參考:把H2數據庫從jar包部署到Kubernetes,并解決Ingress不支持TCP的問題。

2.1.2 引入依賴

maven引入依賴如下所示:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-task</artifactId>
</dependency>
<dependency>
  <groupId>com.h3database</groupId>
  <artifactId>h3</artifactId>
  <scope>runtime</scope>
</dependency>

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-deployer-local</artifactId>
  <version>2.4.1</version>
</dependency>

<dependency>
  <groupId>org.springframework.batch</groupId>
  <artifactId>spring-batch-integration</artifactId>
</dependency>

spring-cloud-deployer-local用于部署和啟動worker,非常關鍵;其它就是Spring BatchTask相關的依賴;以及數據庫連接。

2.1.3 主類入口

Springboot的主類入口如下:

@EnableTask
@SpringBootApplication
@EnableBatchProcessing
public class PkslowRemotePartitionJar {
    public static void main(String[] args) {
        SpringApplication.run(PkslowRemotePartitionJar.class, args);
    }
}

Springboot的基礎上,添加了Spring BatchSpring Cloud Task的支持。

2.2 關鍵代碼編寫

前面的數據庫搭建和其它代碼沒有太多可講的,接下來就開始關鍵代碼的編寫。

2.2.1 分區管理Partitioner

Partitioner是遠程分區中的核心bean,它定義了分成多少個區、怎么分區,要把什么變量傳遞給worker。它會返回一組<分區名,執行上下文>的鍵值對,即返回Map<String, ExecutionContext>。把要傳遞給worker的變量放在ExecutionContext中去,支持多種類型的變量,如Stringintlong等。實際上,我們不建議通過ExecutionContext來傳遞太多數據;可以傳遞一些標識或主鍵,然后worker自己去拿數據即可。

具體代碼如下:

private static final int GRID_SIZE = 4;
@Bean
public Partitioner partitioner() {
  return new Partitioner() {
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {

      Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);

      for (int i = 0; i < GRID_SIZE; i++) {
        ExecutionContext executionContext = new ExecutionContext();
        executionContext.put("partitionNumber", i);
        partitions.put("partition" + i, executionContext);
      }

      return partitions;
    }
  };
}

上面分成4個區,程序會啟動4個worker來處理;給worker傳遞的參數是partitionNumber

2.2.2 分區處理器PartitionHandler

PartitionHandler也是核心的bean,它決定了怎么去啟動worker,給它們傳遞什么jvm參數(跟之前的ExecutionContext傳遞不一樣)。

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception {

  Resource resource = this.resourceLoader.getResource(workerResource);

  DeployerPartitionHandler partitionHandler =
    new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository);

  List<String> commandLineArgs = new ArrayList<>(3);
  commandLineArgs.add("--spring.profiles.active=worker");
  commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
  commandLineArgs.add("--spring.batch.initializer.enabled=false");

  partitionHandler
    .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
  partitionHandler
    .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
  partitionHandler.setMaxWorkers(2);
  partitionHandler.setApplicationName("PkslowWorkerJob");

  return partitionHandler;
}

上面代碼中:

resourceworkerjar包地址,表示將啟動該程序;

workerStepworker將要執行的step

commandLineArgs定義了啟動workerjvm參數,如--spring.profiles.active=worker

environmentmanager的系統環境變量,可以傳遞給worker,當然也可以選擇不傳遞;

MaxWorkers是最多能同時啟動多少個worker,類似于線程池大小;設置為2,表示最多同時有2個worker來處理4個分區。

2.2.3 Manager和Worker的Batch定義

完成了分區相關的代碼,剩下的就只是如何定義ManagerWorker的業務代碼了。

Manager作為管理者,不用太多業務邏輯,代碼如下:

@Bean
@Profile("!worker")
public Job partitionedJob(PartitionHandler partitionHandler) throws Exception {
  Random random = new Random();
  return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
    .start(step1(partitionHandler))
    .build();
}

@Bean
public Step step1(PartitionHandler partitionHandler) throws Exception {
  return this.stepBuilderFactory.get("step1")
    .partitioner(workerStep().getName(), partitioner())
    .step(workerStep())
    .partitionHandler(partitionHandler)
    .build();
}

Worker主要作用是處理數據,是我們的業務代碼,這里就演示一下如何獲取Manager傳遞過來的partitionNumber

@Bean
public Step workerStep() {
  return this.stepBuilderFactory.get("workerStep")
    .tasklet(workerTasklet(null, null))
    .build();
}

@Bean
@StepScope
public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) {
  return new Tasklet() {
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
      Thread.sleep(6000); //增加延時,查看效果,通過jps:在jar情況下會新起java進程
      System.out.println("This tasklet ran partition: " + partitionNumber);
     
      return RepeatStatus.FINISHED;
    }
  };
}

通過表達式@Value("#{stepExecutionContext['partitionNumber']}") 獲取Manager傳遞過來的變量;注意要加注解@StepScope

3 程序運行

因為我們分為ManagerWorker,但都是同一份代碼,所以我們先打包一個jar出來,不然manager無法啟動。配置數據庫和Workerjar包地址如下:

spring.datasource.url=jdbc:h3:tcp://localhost:9092/test
spring.datasource.username=pkslow
spring.datasource.password=pkslow
spring.datasource.driver-class-name=org.h3.Driver

pkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar

執行程序如下:

如何分析Spring Batch遠程分區的本地Jar包模式

可以看到啟動了4次Java程序,還給出日志路徑。

通過jps命令查看,能看到一個Manager進程,還有兩個worker進程:

如何分析Spring Batch遠程分區的本地Jar包模式

4 復雜變量傳遞

前面講了Manager可以通過ExecutionContext傳遞變量,如簡單的Stringlong等。但其實它也是可以傳遞復雜的Java對象的,但對應的類需要可序列化,如:

import java.io.Serializable;

public class Person implements Serializable {
    private Integer age;
    private String name;
    private String webSite;
  //getter and setter
}

Manager傳遞:

executionContext.put("person", new Person(0, "pkslow", "www.pkslow.com"));

Worker接收:

@Value("#{stepExecutionContext['person']}") Person person

上述就是小編為大家分享的如何分析Spring Batch遠程分區的本地Jar包模式了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

林甸县| 遵义县| 藁城市| 云阳县| 南皮县| 长垣县| 璧山县| 盐池县| 方山县| 万全县| 彩票| 西昌市| 永修县| 崇文区| 民县| 临泽县| 利津县| 马龙县| 报价| 温州市| 洛阳市| 东乡族自治县| 兴海县| 鞍山市| 渑池县| 铅山县| 西华县| 泗水县| 太和县| 额济纳旗| 崇礼县| 盐山县| 八宿县| 蒲江县| 含山县| 南投县| 新田县| 呼伦贝尔市| 宁陕县| 常宁市| 玛纳斯县|