Results 1 to 7 of 7

Thread: PartitionHandler: step.execute() throws exception on remote machine

  1. #1
    Join Date
    Mar 2009
    Posts
    10

    Default PartitionHandler: step.execute() throws exception on remote machine

    Hello,

    I've written a PartitionHandler for a compute grid framework that executes steps on remote machines. These machines may be on the same network as the master step, or they may be on different networks, so I cannot rely on the remote step having access to the Spring Batch job repository. I can simulate this on my machine by using an in-memory job repository and running the partitioned steps in a separate JVM. Unfortunately I receive the following exception:

    Code:
    java.lang.IllegalArgumentException: step executions for given job execution are expected to be already saved
        at org.springframework.util.Assert.notNull(Assert.java:112)
        at org.springframework.batch.core.repository.dao.MapStepExecutionDao.updateStepExecution(MapStepExecutionDao.java:79)
        at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:167)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:307)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
        at $Proxy2.update(Unknown Source)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:185)
        at uk.ac.ebi.interpro.scan.batch.RemoteStepExecutor.execute(RemoteStepExecutor.java:39)
    The following RemoteStepExecutor class runs on the remote node. This has a reference to the StepExecution, the location of the application context file and the step name so the step can be created on the remote node (some lines have been removed for clarity):

    Code:
    public class RemoteStepExecutor implements Serializable {	
    
    	private final String stepName;
    	private final String appContextLocation;
    	private final StepExecution stepExecution
    
    	public StepExecution execute() throws JobInterruptedException {
    		Step step = (Step) new ClassPathXmlApplicationContext(appContextLocation).getBean(stepName, Step.class);
    		step.execute(stepExecution);
    		return stepExecution;
    	}
    
    }
    The exception originates on AbstractStep's execute() method (line 185):
    Code:
    getJobRepository().update(stepExecution);
    ... which, in my case, calls MapStepExecutionDao.updateStepExecution() (line 78):

    Code:
    Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId());
    Assert.notNull(executions, "step executions for given job execution are expected to be already saved");
    Any ideas on how to run a step without access to the job repository?

    Thanks,

    Antony

  2. #2
    Join Date
    Jun 2005
    Posts
    4,232

    Default

    Short answer: you can't. Long answer: you need an implementation of JobRepository that works for your remote nodes. Ideally it really needs to be transactional, so I thought once of writing a JMS implementation, but never got round to it. There might be use cases where you can weaken the transaction semantics (maybe yours is one of those), and make a web service or even a local virtual repository that creates StepExecutions on the fly. Or you could try a local off the shelf repository for each worker. You would also need to pass the ExecutionContext and JobParameters to the remote worker in that case so they can be used to create the right starting conditions for the Step.

  3. #3
    Join Date
    Mar 2009
    Posts
    10

    Default

    Dave,

    Thanks for the reply, I thought this might be the case.

    For the job repositories, I will use a MySQL database on the local node and in-memory off-the-shelf implementations for the remote nodes (MapExecutionContextDao, MapStepExecutionDao ...etc).

    To populate the remote repository, I assume it's just a question of serialising StepExecution and calling MapJobExecutionDao.saveJobExecution(stepExecution. getJobExecution()) ...etc on the remote node before calling step.execute(). Is that correct?

    The remote node returns StepExecutions to PartitionHandler.handle(), which are returned to PartitionStep as a Collection. Given that PartitionStep will aggregate and persist the collection of StepExecutions, is there any value in persisting the individual StepExecutions from the remote node to the local (MySQL) repository in PartitionHandler.handle()?

    Thanks,

    Antony

  4. #4
    Join Date
    Jun 2005
    Posts
    4,232

    Default

    Interesting model. I would like to know if it works.

    I would assume that you don't need the *Dao interfaces directly to persist your serialized stuff remotey; the JobRepository should be enough.

    The whole partition of StepExecution instances should be saved locally by the PartitionStep (I can't check right now, but that makes sense to me). There is certainly value in updating them when they come back, otherwise there is no way to know which ones were successful and don't need to be restarted if there is a partial failure.

  5. #5
    Join Date
    Mar 2009
    Posts
    10

    Default

    Hello,

    I got this working by using a "dummy" job repository on the remote node:

    Code:
    public class RemoteJobRepository implements JobRepository {
    
    ...
    
        public void add(StepExecution stepExecution) {
            throw new UnsupportedOperationException();
        }
    
        public void update(StepExecution stepExecution) {
            LOGGER.info("update: " + stepExecution.toString());
        }
    
        public void updateExecutionContext(StepExecution stepExecution) {
            LOGGER.info("updateExecutionContext: " + stepExecution.getExecutionContext().toString());
        }
    
    ...
    
    }
    I then save the step executions on the master node by calling the same methods as in AbstractJob (jobRepository.update and jobRepository.updateExecutionContext):

    Code:
    for (StepExecution stepExecution : results) {
      jobRepository.updateExecutionContext(stepExecution);
      jobRepository.update(stepExecution);
    }
    Failures on the remote node are communicated back to the master node as a ProActive TaskException:

    Code:
    catch (TaskException e) {
      throw (e);
    }
    One question: do I need to update the job repository in the above catch block?

    I'll include the full code listing in a separate thread below.

    Cheers,

    Antony

  6. #6
    Join Date
    Mar 2009
    Posts
    10

    Default

    Code:
    /**
     * Implementation of {@link JobRepository} that allows partitioned steps to run on remote nodes with no access
     * to the job repository on the master node.
     *
     * @author  Antony Quinn
     * @version $Id: RemoteJobRepository.java,v 1.1 2009/06/18 15:08:38 aquinn Exp $
     * @since   1.0
     */
    public class RemoteJobRepository implements JobRepository {
    
        private static Logger LOGGER = Logger.getLogger(RemoteJobRepository.class);
    
        /**
    	 * Provide default constructor with low visibility in case user wants to use
    	 * use aop:proxy-target-class="true" for AOP interceptor.
    	 */    
        RemoteJobRepository()    {
        }
    
        public boolean isJobInstanceExists(String jobName, JobParameters jobParameters) {
            throw new UnsupportedOperationException();
        }
    
        public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
                throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
            throw new UnsupportedOperationException();
        }
    
        public void update(JobExecution jobExecution) {
            throw new UnsupportedOperationException();
        }
    
        public void add(StepExecution stepExecution) {
            throw new UnsupportedOperationException();
        }
    
        public void update(StepExecution stepExecution) {
            LOGGER.info("update: " + stepExecution.toString());
        }
    
        public void updateExecutionContext(StepExecution stepExecution) {
            LOGGER.info("updateExecutionContext: " + stepExecution.getExecutionContext().toString());
        }
    
        public void updateExecutionContext(JobExecution jobExecution) {
            throw new UnsupportedOperationException();
        }
    
        public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
            throw new UnsupportedOperationException();
        }
    
        public int getStepExecutionCount(JobInstance jobInstance, String stepName) {
            throw new UnsupportedOperationException();
        }
    
        public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
            throw new UnsupportedOperationException();
        }
    
    }
    Code:
    import org.objectweb.proactive.extensions.masterworker.ProActiveMaster;
    import org.objectweb.proactive.extensions.masterworker.TaskException;
    import org.objectweb.proactive.extensions.masterworker.interfaces.Task;
    import org.objectweb.proactive.core.ProActiveException;
    import uk.ac.ebi.interpro.scan.batch.partition.remote.RemoteStepExecutor;
    
    /**
     * Executes steps on remote nodes using ProActive Parallel Suite's Master Worker API.
     *
     * @author  Antony Quinn
     * @version $Id: ProActiveMasterWorkerPartitionHandler.java,v 1.1 2009/06/18 15:08:38 aquinn Exp $
     * @see     <a href="http://proactive.inria.fr/">ProActive Parallel Suite</a>
     * @since   1.0
     */
    public class ProActiveMasterWorkerPartitionHandler implements PartitionHandler, InitializingBean {
    
        private Step step;
        private Resource applicationDescriptor;
        private String remoteLaunchContext;
        private JobRepository jobRepository;
        private boolean persistRemoteStepExecutions = true;    
    
        // Not sure if we need this...
        private int gridSize = 1;
    
        public void afterPropertiesSet() throws Exception {
            Assert.notNull(applicationDescriptor, "An ApplicationDescriptor must be provided");
            Assert.notNull(remoteLaunchContext,   "A RemoteLaunchContext must be provided");
            Assert.notNull(step,                  "A Step must be provided");
            if (persistRemoteStepExecutions)    {
                Assert.notNull(jobRepository, "A JobRepository must be provided");
            }
        }
    
        public void setApplicationDescriptor(Resource applicationDescriptor) {
            this.applicationDescriptor = applicationDescriptor;
        }
    
        public void setJobRepository(JobRepository jobRepository) {
            this.jobRepository = jobRepository;
        }
    
        public void setPersistRemoteStepExecutions(boolean persistRemoteStepExecutions) {
            this.persistRemoteStepExecutions = persistRemoteStepExecutions;
        }
    
        public void setRemoteLaunchContext(String remoteLaunchContext) {
            this.remoteLaunchContext = remoteLaunchContext;
        }
    
        public void setStep(Step step) {
            this.step = step;
        }   
    
        /**
         * @see PartitionHandler#handle(StepExecutionSplitter, StepExecution)
         */    
        public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter, 
                                                StepExecution masterStepExecution) throws Exception {
    
            // Tasks
            List<Task<StepExecution>> tasks = new ArrayList<Task<StepExecution>>();
    
            // Results
            Collection <StepExecution> results = null;        
    
            // Master
            ProActiveMaster<Task<StepExecution>, StepExecution> master =
                    new ProActiveMaster<Task<StepExecution>, StepExecution>();
    
            try {  
    
                // Add virtual nodes
                master.addResources(applicationDescriptor.getURL());
    
                // Create tasks
                for (final StepExecution stepExecution : stepExecutionSplitter.split(masterStepExecution, gridSize)) {
                    RemoteStepExecutor remoteStepExecutor =
                            new RemoteStepExecutor(remoteLaunchContext, step.getName(), stepExecution);
                    Task<StepExecution> task = new ProActiveStepExecutorTask(remoteStepExecutor);
                    tasks.add(task);
                }
    
                // Run tasks
                master.solve(tasks);
    
                // Collect results
                results = master.waitAllResults();
                
            }
            catch (ProActiveException e)    {
                // Couldn't add resources
                throw (e);
            }
            catch (TaskException e) {
                throw (e);
            }
            finally {
                // Shutdown ProActive nodes
                master.terminate(true);
    	}
    
            // Persist step executions received from remote node
            if (persistRemoteStepExecutions)  {
                for (StepExecution stepExecution : results) {
                    jobRepository.updateExecutionContext(stepExecution);
                    jobRepository.update(stepExecution);
                }
            }        
    
            return results;
    
        }
    
    }
    Code:
    import org.objectweb.proactive.extensions.masterworker.interfaces.Task;
    import org.objectweb.proactive.extensions.masterworker.interfaces.WorkerMemory;
    import org.objectweb.proactive.extensions.annotation.RemoteObject;
    import uk.ac.ebi.interpro.scan.batch.partition.remote.RemoteStepExecutor;
    
    /**
     * Executes step on remote node as a ProActive task.
     *
     * @author  Antony Quinn
     * @version $Id: ProActiveStepExecutorTask.java,v 1.1 2009/06/18 15:08:38 aquinn Exp $
     * @since   1.0
     */
    @RemoteObject
    public class ProActiveStepExecutorTask implements Task<StepExecution> {
    
        private final RemoteStepExecutor remoteStepExecutor;
    
        public ProActiveStepExecutorTask(RemoteStepExecutor remoteStepExecutor)    {
            this.remoteStepExecutor = remoteStepExecutor;
        }
    
        public StepExecution run(WorkerMemory workerMemory) throws Exception {
            return remoteStepExecutor.execute();
        }
       
    }
    Code:
    /**
     * Executes step on remote node.
     *
     * @author  Antony Quinn
     * @version $Id: RemoteStepExecutor.java,v 1.2 2009/06/19 09:30:40 aquinn Exp $
     * @since   1.0
     */
    public class RemoteStepExecutor implements Serializable {
        
        private static final Log logger = LogFactory.getLog(RemoteStepExecutor.class);
    	
    	private final StepExecution stepExecution;
    	private final String stepName;
    	private final String appContextLocation;
    
    	public RemoteStepExecutor(String appContextLocation, String stepName, StepExecution stepExecution) {
    		this.appContextLocation = appContextLocation;
    		this.stepName           = stepName;
    		this.stepExecution      = stepExecution;
    	}
    
    	public StepExecution execute() throws JobInterruptedException {
            // Log stepExecution parameters
            logger.info(stepExecution);
            logger.info(stepExecution.getJobExecution().getJobInstance());
            // Get step to run
            ApplicationContext context = new ClassPathXmlApplicationContext(appContextLocation);
            Step step = (Step) context.getBean(stepName, Step.class);        
            try {
                step.execute(stepExecution);
            }
    		catch (JobInterruptedException e) {
    			stepExecution.getJobExecution().setStatus(BatchStatus.STOPPING);
    			throw (e);
    		}
            return stepExecution;
    	}
    
    	public String getStepName() {
    		return stepName;
    	}
    
    }

  7. #7
    Join Date
    Jun 2005
    Posts
    4,232

    Default

    Seems like a good start. I'm confused about the results in the case of partial failure though. Surely you need to obtain the StepExecution result for each node, whether or not it was successful, and hand them back to the PartitionStep? It looks like you bomb out of the PartitionHandler without collecting enough information.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •