Code:
/**
* Implementation of {@link JobRepository} that allows partitioned steps to run on remote nodes with no access
* to the job repository on the master node.
*
* @author Antony Quinn
* @version $Id: RemoteJobRepository.java,v 1.1 2009/06/18 15:08:38 aquinn Exp $
* @since 1.0
*/
public class RemoteJobRepository implements JobRepository {
private static Logger LOGGER = Logger.getLogger(RemoteJobRepository.class);
/**
* Provide default constructor with low visibility in case user wants to use
* use aop:proxy-target-class="true" for AOP interceptor.
*/
RemoteJobRepository() {
}
public boolean isJobInstanceExists(String jobName, JobParameters jobParameters) {
throw new UnsupportedOperationException();
}
public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
throw new UnsupportedOperationException();
}
public void update(JobExecution jobExecution) {
throw new UnsupportedOperationException();
}
public void add(StepExecution stepExecution) {
throw new UnsupportedOperationException();
}
public void update(StepExecution stepExecution) {
LOGGER.info("update: " + stepExecution.toString());
}
public void updateExecutionContext(StepExecution stepExecution) {
LOGGER.info("updateExecutionContext: " + stepExecution.getExecutionContext().toString());
}
public void updateExecutionContext(JobExecution jobExecution) {
throw new UnsupportedOperationException();
}
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
throw new UnsupportedOperationException();
}
public int getStepExecutionCount(JobInstance jobInstance, String stepName) {
throw new UnsupportedOperationException();
}
public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
throw new UnsupportedOperationException();
}
}
Code:
import org.objectweb.proactive.extensions.masterworker.ProActiveMaster;
import org.objectweb.proactive.extensions.masterworker.TaskException;
import org.objectweb.proactive.extensions.masterworker.interfaces.Task;
import org.objectweb.proactive.core.ProActiveException;
import uk.ac.ebi.interpro.scan.batch.partition.remote.RemoteStepExecutor;
/**
* Executes steps on remote nodes using ProActive Parallel Suite's Master Worker API.
*
* @author Antony Quinn
* @version $Id: ProActiveMasterWorkerPartitionHandler.java,v 1.1 2009/06/18 15:08:38 aquinn Exp $
* @see <a href="http://proactive.inria.fr/">ProActive Parallel Suite</a>
* @since 1.0
*/
public class ProActiveMasterWorkerPartitionHandler implements PartitionHandler, InitializingBean {
private Step step;
private Resource applicationDescriptor;
private String remoteLaunchContext;
private JobRepository jobRepository;
private boolean persistRemoteStepExecutions = true;
// Not sure if we need this...
private int gridSize = 1;
public void afterPropertiesSet() throws Exception {
Assert.notNull(applicationDescriptor, "An ApplicationDescriptor must be provided");
Assert.notNull(remoteLaunchContext, "A RemoteLaunchContext must be provided");
Assert.notNull(step, "A Step must be provided");
if (persistRemoteStepExecutions) {
Assert.notNull(jobRepository, "A JobRepository must be provided");
}
}
public void setApplicationDescriptor(Resource applicationDescriptor) {
this.applicationDescriptor = applicationDescriptor;
}
public void setJobRepository(JobRepository jobRepository) {
this.jobRepository = jobRepository;
}
public void setPersistRemoteStepExecutions(boolean persistRemoteStepExecutions) {
this.persistRemoteStepExecutions = persistRemoteStepExecutions;
}
public void setRemoteLaunchContext(String remoteLaunchContext) {
this.remoteLaunchContext = remoteLaunchContext;
}
public void setStep(Step step) {
this.step = step;
}
/**
* @see PartitionHandler#handle(StepExecutionSplitter, StepExecution)
*/
public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter,
StepExecution masterStepExecution) throws Exception {
// Tasks
List<Task<StepExecution>> tasks = new ArrayList<Task<StepExecution>>();
// Results
Collection <StepExecution> results = null;
// Master
ProActiveMaster<Task<StepExecution>, StepExecution> master =
new ProActiveMaster<Task<StepExecution>, StepExecution>();
try {
// Add virtual nodes
master.addResources(applicationDescriptor.getURL());
// Create tasks
for (final StepExecution stepExecution : stepExecutionSplitter.split(masterStepExecution, gridSize)) {
RemoteStepExecutor remoteStepExecutor =
new RemoteStepExecutor(remoteLaunchContext, step.getName(), stepExecution);
Task<StepExecution> task = new ProActiveStepExecutorTask(remoteStepExecutor);
tasks.add(task);
}
// Run tasks
master.solve(tasks);
// Collect results
results = master.waitAllResults();
}
catch (ProActiveException e) {
// Couldn't add resources
throw (e);
}
catch (TaskException e) {
throw (e);
}
finally {
// Shutdown ProActive nodes
master.terminate(true);
}
// Persist step executions received from remote node
if (persistRemoteStepExecutions) {
for (StepExecution stepExecution : results) {
jobRepository.updateExecutionContext(stepExecution);
jobRepository.update(stepExecution);
}
}
return results;
}
}
Code:
import org.objectweb.proactive.extensions.masterworker.interfaces.Task;
import org.objectweb.proactive.extensions.masterworker.interfaces.WorkerMemory;
import org.objectweb.proactive.extensions.annotation.RemoteObject;
import uk.ac.ebi.interpro.scan.batch.partition.remote.RemoteStepExecutor;
/**
* Executes step on remote node as a ProActive task.
*
* @author Antony Quinn
* @version $Id: ProActiveStepExecutorTask.java,v 1.1 2009/06/18 15:08:38 aquinn Exp $
* @since 1.0
*/
@RemoteObject
public class ProActiveStepExecutorTask implements Task<StepExecution> {
private final RemoteStepExecutor remoteStepExecutor;
public ProActiveStepExecutorTask(RemoteStepExecutor remoteStepExecutor) {
this.remoteStepExecutor = remoteStepExecutor;
}
public StepExecution run(WorkerMemory workerMemory) throws Exception {
return remoteStepExecutor.execute();
}
}
Code:
/**
* Executes step on remote node.
*
* @author Antony Quinn
* @version $Id: RemoteStepExecutor.java,v 1.2 2009/06/19 09:30:40 aquinn Exp $
* @since 1.0
*/
public class RemoteStepExecutor implements Serializable {
private static final Log logger = LogFactory.getLog(RemoteStepExecutor.class);
private final StepExecution stepExecution;
private final String stepName;
private final String appContextLocation;
public RemoteStepExecutor(String appContextLocation, String stepName, StepExecution stepExecution) {
this.appContextLocation = appContextLocation;
this.stepName = stepName;
this.stepExecution = stepExecution;
}
public StepExecution execute() throws JobInterruptedException {
// Log stepExecution parameters
logger.info(stepExecution);
logger.info(stepExecution.getJobExecution().getJobInstance());
// Get step to run
ApplicationContext context = new ClassPathXmlApplicationContext(appContextLocation);
Step step = (Step) context.getBean(stepName, Step.class);
try {
step.execute(stepExecution);
}
catch (JobInterruptedException e) {
stepExecution.getJobExecution().setStatus(BatchStatus.STOPPING);
throw (e);
}
return stepExecution;
}
public String getStepName() {
return stepName;
}
}