Results 1 to 4 of 4

Thread: creating spring batch FLOW JOB dynamically

  1. #1
    Join Date
    Dec 2011
    Posts
    25

    Default creating spring batch FLOW JOB dynamically

    Hi All,

    I am trying to create the Spring Batch Job dynamically.

    I got a link http://forum.springsource.org/showth...bs-dynamically in this forum describes how to SEQUENTIAL JOB dynamically. It helped me a lot.

    Now I am trying to create SPLIT/FLOW JOB dynamically, but I am not able to create SPLIT/FLOW JOB dynamically.

    Can you help me to create FLOW JOB dynamically. If anyone have created FLOW JOB dynamically please share a sample code of "how to create SPLIT/FLOW JOB dynamically"


    Thanks
    Parag Phatowali
    Last edited by paragflume; Feb 14th, 2013 at 08:14 AM.

  2. #2
    Join Date
    Dec 2011
    Posts
    25

    Default

    Hi All,


    Finally I am able to create Spring Batch Flow Job programmatically / dynamically.

    Here is sample code :
    Code:
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.job.flow.FlowExecutionStatus;
    import org.springframework.batch.core.job.flow.FlowJob;
    import org.springframework.batch.core.job.flow.JobExecutionDecider;
    import org.springframework.batch.core.job.flow.support.SimpleFlow;
    import org.springframework.batch.core.job.flow.support.StateTransition;
    import org.springframework.batch.core.job.flow.support.state.DecisionState;
    import org.springframework.batch.core.job.flow.support.state.EndState;
    import org.springframework.batch.core.job.flow.support.state.StepState;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.core.step.tasklet.TaskletStep;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.transaction.PlatformTransactionManager;
    
    public class JoinStart {
    
    	public static void main(String[] args) throws Exception {
    		JoinStart s = new JoinStart();
    		s.runShellAction("");
    	}
    
    	public void runShellAction(String command) throws Exception {
    
    
    		ApplicationContext context = new ClassPathXmlApplicationContext("context.xml");
    
    		JobRepository jobRepository = (JobRepository) context.getBean("jobRepository");
    
    		PlatformTransactionManager transactionManager = (PlatformTransactionManager) context.getBean("transactionManager");
    
    		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
    
    		JobParameters jobPerameter = new JobParametersBuilder().addDate("date_param", new java.util.Date()).toJobParameters();
    
    		Tasklet t1 = new TJoin("join1",5);
    		TaskletStep ts1 = new TaskletStep("join1");
    		ts1.setTasklet(t1);
    		ts1.setJobRepository(jobRepository);
    		ts1.setTransactionManager(transactionManager);
    
    		Tasklet t2 = new TJoin("join2",5);
    		TaskletStep ts2 = new TaskletStep("join2");
    		ts2.setTasklet(t2);
    		ts2.setJobRepository(jobRepository);
    		ts2.setTransactionManager(transactionManager);
    
    		Tasklet t3 = new TJoin("join3",5);
    		TaskletStep ts3 = new TaskletStep("join3");
    		ts3.setTasklet(t3);
    		ts3.setJobRepository(jobRepository);
    		ts3.setTransactionManager(transactionManager);
    		
    		Tasklet t4 = new TJoin("join4",5);
    		TaskletStep ts4 = new TaskletStep("join4");
    		ts4.setTasklet(t4);
    		ts4.setJobRepository(jobRepository);
    		ts4.setTransactionManager(transactionManager);
    
    		JobExecutionDecider decider = new JobExecutionDecider() {
    
    			@Override
    			public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
    
    				return new FlowExecutionStatus("SWITCH_A");
    			}
    		};
    
    
    		FlowJob flowJob = new FlowJob("flow_job");
    
    		SimpleFlow simpleFlow = new SimpleFlow("simple_job");
    		List<StateTransition> transitions = new ArrayList<StateTransition>();
    
    
    		transitions.add(StateTransition.createStateTransition(new StepState(ts1), "decision"));
    		transitions.add(StateTransition.createStateTransition(new DecisionState(decider, "decision"), "join2"));
    		transitions.add(StateTransition.createStateTransition(new DecisionState(decider, "decision"), "SWITCH_A", "join3"));
    		
    		transitions.add(StateTransition.createStateTransition(new DecisionState(decider, "decision"), "SWITCH_B", "join4"));
    
    		transitions.add(StateTransition.createStateTransition(new StepState(ts2), ExitStatus.COMPLETED.getExitCode(), "end2"));
    		transitions.add(StateTransition.createStateTransition(new StepState(ts2), ExitStatus.FAILED.getExitCode(), "end3"));
    
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end2")));
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.FAILED, "end3")));
    
    		transitions.add(StateTransition.createStateTransition(new StepState(ts3), ExitStatus.COMPLETED.getExitCode(), "end4"));
    		transitions.add(StateTransition.createStateTransition(new StepState(ts3), ExitStatus.FAILED.getExitCode(), "end5"));
    
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end4")));
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.FAILED, "end5")));
    		
    		
    		transitions.add(StateTransition.createStateTransition(new StepState(ts4), ExitStatus.COMPLETED.getExitCode(), "end6"));
    		transitions.add(StateTransition.createStateTransition(new StepState(ts4), ExitStatus.FAILED.getExitCode(), "end7"));
    
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end6")));
    		transitions.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.FAILED, "end7")));
    
    
    		simpleFlow.setStateTransitions(transitions);
    		simpleFlow.afterPropertiesSet();
    
    		flowJob.setFlow(simpleFlow);
    		flowJob.setJobRepository(jobRepository);
    		flowJob.afterPropertiesSet();
    
    		try {
    			jobLauncher.run(flowJob, jobPerameter);
    		} catch (Exception e) {
    			System.out.println(" Job Launching Error : " + e);
    		}
    		
    	}
    	
    	
    
    }

    The Tasklet :

    Code:
    import java.io.File;
    
    import org.springframework.batch.core.StepContribution;
    import org.springframework.batch.core.scope.context.ChunkContext;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.repeat.RepeatStatus;
    
    public class TJoin implements Tasklet {
    
    	private String name;
    	private int i;
    
    	public int getI() {
    		return i;
    	}
    
    	public void setI(int i) {
    		this.i = i;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	@Override
    	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
    		try {
    			File file = new File("d:\\ttt\\" + this.getName() + ".TXT");
    			System.out.println(" Thread " + this.getName() + " is going to sleep");
    			Thread.sleep(1000 * i);
    			System.out.println(" Thread " + this.getName() + " wakedup");
    			file.createNewFile();
    		} catch (Exception e) {
    			System.out.println(" Error :: " + e);
    		}
    		return RepeatStatus.FINISHED;
    	}
    
    	public TJoin(String name, int i) {
    		super();
    		this.name = name;
    		this.i = i;
    	}
    
    }
    Last edited by paragflume; Mar 6th, 2013 at 04:45 AM.

  3. #3
    Join Date
    Dec 2011
    Posts
    25

    Default

    Hi All,

    Here is example of how to create Split Flow jobs ( Spring Batch jobs runs in parallel ) :

    Code:
    import java.util.ArrayList;
    import java.util.List;
    
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.JobParametersBuilder;
    import org.springframework.batch.core.job.flow.Flow;
    import org.springframework.batch.core.job.flow.FlowExecutionStatus;
    import org.springframework.batch.core.job.flow.FlowJob;
    import org.springframework.batch.core.job.flow.support.SimpleFlow;
    import org.springframework.batch.core.job.flow.support.StateTransition;
    import org.springframework.batch.core.job.flow.support.state.EndState;
    import org.springframework.batch.core.job.flow.support.state.SplitState;
    import org.springframework.batch.core.job.flow.support.state.StepState;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.batch.core.launch.support.SimpleJobLauncher;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.core.step.tasklet.TaskletStep;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.transaction.PlatformTransactionManager;
    
    public class JoinParallelStart {
    
    	public static void main(String[] args) throws Exception {
    		JoinParallelStart s = new JoinParallelStart();
    		s.runShellAction("");
    	}
    
    	public void runShellAction(String command) throws Exception {
    
    		ApplicationContext context = new ClassPathXmlApplicationContext("context.xml");
    
    		JobRepository jobRepository = (JobRepository) context.getBean("jobRepository");
    
    		PlatformTransactionManager transactionManager = (PlatformTransactionManager) context.getBean("transactionManager");
    
    		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
    		SimpleJobLauncher simpleJobLauncher = (SimpleJobLauncher) jobLauncher;
    		simpleJobLauncher.setJobRepository(jobRepository);
    		simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    		jobLauncher = simpleJobLauncher;
    
    		JobParameters jobPerameter = new JobParametersBuilder().addDate("date_param", new java.util.Date()).toJobParameters();
    
    		Tasklet t1 = new TJoin("join1", 270);
    		TaskletStep ts1 = new TaskletStep("join1");
    		ts1.setTasklet(t1);
    		ts1.setJobRepository(jobRepository);
    		ts1.setTransactionManager(transactionManager);
    
    		Tasklet t2 = new TJoin("join2", 30);
    		TaskletStep ts2 = new TaskletStep("join2");
    		ts2.setTasklet(t2);
    		ts2.setJobRepository(jobRepository);
    		ts2.setTransactionManager(transactionManager);
    
    		Tasklet t3 = new TJoin("join3", 30);
    		TaskletStep ts3 = new TaskletStep("join3");
    		ts3.setTasklet(t3);
    		ts3.setJobRepository(jobRepository);
    		ts3.setTransactionManager(transactionManager);
    
    		Tasklet t4 = new TJoin("join4", 30);
    		TaskletStep ts4 = new TaskletStep("join4");
    		ts4.setTasklet(t4);
    		ts4.setJobRepository(jobRepository);
    		ts4.setTransactionManager(transactionManager);
    
    		FlowJob flowJob = new FlowJob("flow_job");
    
    		SimpleFlow simpleFlow1 = new SimpleFlow("simple_job_1");
    		List<StateTransition> transitions1 = new ArrayList<StateTransition>();
    
    		transitions1.add(StateTransition.createStateTransition(new StepState(ts1), "join2"));
    		transitions1.add(StateTransition.createStateTransition(new StepState(ts2), "end0"));
    		transitions1.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end0")));
    
    		simpleFlow1.setStateTransitions(transitions1);
    
    		SimpleFlow simpleFlow2 = new SimpleFlow("simple_job_2");
    		List<StateTransition> transitions2 = new ArrayList<StateTransition>();
    
    		transitions2.add(StateTransition.createStateTransition(new StepState(ts3), "join4"));
    		transitions2.add(StateTransition.createStateTransition(new StepState(ts4), "end1"));
    		transitions2.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end1")));
    		simpleFlow2.setStateTransitions(transitions2);
    
    		List<Flow> flows = new ArrayList<Flow>();
    		flows.add(simpleFlow1);
    		flows.add(simpleFlow2);
    
    		SplitState splitState = new SplitState(flows, "split_00");
    		splitState.setTaskExecutor(new SimpleAsyncTaskExecutor());
    
    		SimpleFlow ssplitFlow3 = new SimpleFlow("split_flow_3");
    		List<StateTransition> transitions3 = new ArrayList<StateTransition>();
    		transitions3.add(StateTransition.createStateTransition(splitState, "end_split_0"));
    		transitions3.add(StateTransition.createEndStateTransition(new EndState(FlowExecutionStatus.COMPLETED, "end_split_0")));
    		ssplitFlow3.setStateTransitions(transitions3);
    
    		flowJob.setFlow(ssplitFlow3);
    		flowJob.setJobRepository(jobRepository);
    		flowJob.afterPropertiesSet();
    
    		try {
    			jobLauncher.run(flowJob, jobPerameter);
    		} catch (Exception e) {
    			System.out.println(" Job Launching Error : " + e);
    		}
    
    	}
    
    }

  4. #4
    Join Date
    Dec 2011
    Posts
    25

    Default

    Hi All,

    This linkhttps://fisheye.springsource.org/bro...s.java?hb=true helped me a lot.


    Thanks
    Parag Phatowali

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •