PDA

View Full Version : We doesn't the job process the input source upon restart..



geira
Jan 24th, 2008, 12:57 PM
I am kicking of my job with the jmx access. It runs ok, - but when issuing the command once more, - I just get this result back, - and no input source is processed...

2008-01-24 19:54:46,776 [INFO ] [SimpleAsyncTaskExecutor-5] org.springframework.batch.execution.launch.SimpleJ obLauncher - Launching: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 19:51:00 CET 2008
2008-01-24 19:54:46,782 [INFO ] [SimpleAsyncTaskExecutor-5] org.springframework.batch.execution.launch.SimpleJ obLauncher - Completed successfully: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 19:51:00 CET 2008

As I am planning to run the command by Quartz, once every 24 hours, -it would be nice that I don't have to restart the jvm.

Do I have to reset something, to have the inputsource being read again?

I believe that I there should not be a need for starting up a new VM for every/time a job of a RestartableItemProviderTasklet is started?

Geir

Dave Syer
Jan 24th, 2008, 01:40 PM
If you launch the same Job twice, and it was successful the first time, it will not run again - you don't want to process the same data twice. It sounds like you actually have two different instances of the same Job to run. In that case they either have to have different JobIdentifiers (=JobParameters in m4), or you need to set restartable=false in the Job(Configuration). The reatartable flag will probably also change its name in m4, because it is confusing, but we haven't settled on a new name yet.

geira
Jan 24th, 2008, 02:44 PM
Our case is that we want the job to process a resultset from a query once every 24 hour. Is the correct approach to set set restartable=false and kick of the same job name once again, or should we do it in another way?

The job didn't fire again, - even after setting restarable=false, the reponse:

Hibernate: select events0_.SUBSCRIPTIONID as SUBSCRIP7_2_, events0_.EVENTID as EVENTID2_, events0_.EVENTID as EVENTID59_1_, events0_.CHANGEDBY as CHANGEDBY59_1_, events0_.LASTCHANGED as LASTCHAN3_59_1_, events0_.EVENTCODEID as EVENTCOD6_59_1_, events0_.eventDate as eventDate59_1_, events0_.eventText as eventText59_1_, events0_.SUBSCRIPTIONID as SUBSCRIP7_59_1_, eventcodeb1_.EVENTCODEID as EVENTCOD1_60_0_, eventcodeb1_.CHANGEDBY as CHANGEDBY60_0_, eventcodeb1_.LASTCHANGED as LASTCHAN3_60_0_, eventcodeb1_.description as descript4_60_0_ from EVENTS events0_ left outer join EVENT_CODES eventcodeb1_ on events0_.EVENTCODEID=eventcodeb1_.EVENTCODEID where events0_.SUBSCRIPTIONID=?
Hibernate: select subscripti0_.SUBSCRIPTIONID as SUBSCRI25_1_, subscripti0_.subscriptionLineId as subscrip1_1_, subscripti0_.subscriptionLineId as subscrip1_67_0_, subscripti0_.CHANGEDBY as CHANGEDBY67_0_, subscripti0_.LASTCHANGED as LASTCHAN3_67_0_, subscripti0_.active as active67_0_, subscripti0_.copies as copies67_0_, subscripti0_.dayDiscount as dayDisco6_67_0_, subscripti0_.dayPrice as dayPrice67_0_, subscripti0_.delivery_1 as delivery8_67_0_, subscripti0_.delivery_2 as delivery9_67_0_, subscripti0_.delivery_3 as delivery10_67_0_, subscripti0_.delivery_4 as delivery11_67_0_, subscripti0_.delivery_5 as delivery12_67_0_, subscripti0_.delivery_6 as delivery13_67_0_, subscripti0_.delivery_7 as delivery14_67_0_, subscripti0_.discount as discount67_0_, subscripti0_.distributionCost_1 as distrib16_67_0_, subscripti0_.distributionCost_2 as distrib17_67_0_, subscripti0_.distributionCost_3 as distrib18_67_0_, subscripti0_.distributionCost_4 as distrib19_67_0_, subscripti0_.distributionCost_5 as distrib20_67_0_, subscripti0_.distributionCost_6 as distrib21_67_0_, subscripti0_.distributionCost_7 as distrib22_67_0_, subscripti0_.gross as gross67_0_, subscripti0_.net as net67_0_, subscripti0_.PRODUCTLINEID as PRODUCT26_67_0_, subscripti0_.SUBSCRIPTIONID as SUBSCRI25_67_0_ from SUBSCRIPTION_LINES subscripti0_ where subscripti0_.SUBSCRIPTIONID=?
Hibernate: select events0_.SUBSCRIPTIONID as SUBSCRIP7_2_, events0_.EVENTID as EVENTID2_, events0_.EVENTID as EVENTID59_1_, events0_.CHANGEDBY as CHANGEDBY59_1_, events0_.LASTCHANGED as LASTCHAN3_59_1_, events0_.EVENTCODEID as EVENTCOD6_59_1_, events0_.eventDate as eventDate59_1_, events0_.eventText as eventText59_1_, events0_.SUBSCRIPTIONID as SUBSCRIP7_59_1_, eventcodeb1_.EVENTCODEID as EVENTCOD1_60_0_, eventcodeb1_.CHANGEDBY as CHANGEDBY60_0_, eventcodeb1_.LASTCHANGED as LASTCHAN3_60_0_, eventcodeb1_.description as descript4_60_0_ from EVENTS events0_ left outer join EVENT_CODES eventcodeb1_ on events0_.EVENTCODEID=eventcodeb1_.EVENTCODEID where events0_.SUBSCRIPTIONID=?
2008-01-24 21:56:19,469 [INFO ] [SimpleAsyncTaskExecutor-1] org.mule.extras.client.MuleClient - There is already a manager locally available to this client, no need to create a new one
2008-01-24 21:56:19,546 [INFO ] [SimpleAsyncTaskExecutor-1] org.mule.providers.vm.VMMessageDispatcher - Connected: VMMessageDispatcher{this=301cdb, endpoint=vm://deliverycalendar.deliveryday}
2008-01-24 21:56:19,610 [INFO ] [SimpleAsyncTaskExecutor-1] org.mule.providers.tcp.TcpMessageDispatcher - Connected: TcpMessageDispatcher{this=4349fe, endpoint=tcp://muletest.aftenposten.no:60701}
2008-01-24 21:56:24,083 [INFO ] [SimpleAsyncTaskExecutor-1] org.springframework.batch.execution.launch.SimpleJ obLauncher - Completed successfully: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 21:55:34 CET 2008
2008-01-24 21:56:36,247 [INFO ] [SimpleAsyncTaskExecutor-2] org.springframework.batch.execution.launch.SimpleJ obLauncher - Launching: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 21:55:34 CET 2008
2008-01-24 21:56:36,271 [INFO ] [SimpleAsyncTaskExecutor-2] org.springframework.batch.execution.launch.SimpleJ obLauncher - Completed successfully: ScheduledJobIdentifier: name=tickerJob,key=key,scheduleDate=Thu Jan 24 21:55:34 CET 2008

The job, did only fetch a resultset from database, the first time it was kicked off.

Any suggestions to which approach we should take?

Rgds.
Geir

lucasward
Jan 24th, 2008, 03:17 PM
Can you look at the BATCH_STEP_EXECUTION table to ensure the step is actually being executed?

You can also use the NativeJob from Quartz to launch the job in a separate process (jvm).

Dave Syer
Jan 24th, 2008, 05:58 PM
What does your ItemReader/InputSource do? Maybe there is no data to process on the second run? Can you post your job configuration? (Please use the
brackets to post logs and code.)

geira
Jan 25th, 2008, 12:18 PM
I thought that when the job was started once more, - that the feed/inputsource was read again...?

And also if the result from the search criterias in the sql-query matched, - I would get the resultset into the inputsource for process once more?

Geir



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">

<description>This job imports payments from the accounting system</description>
<import resource="classpath:/springAccountBalancerCore-configurer.xml"/>
<import resource="classpath:/springAccountBalancerCore-dao.xml"/>

<bean id="simpleJob"
class="org.springframework.batch.core.configuration.JobCo nfiguration"
abstract="true">
<property name="restartable" value="false" />
</bean>


<bean parent="stepScope" />
<bean parent="jobConfigurationRegistryBeanPostProcessor" />

<bean id="paymentImportJob" parent="simpleJob">
<property name="steps">
<bean id="step1"
class="org.springframework.batch.execution.step.RepeatOpe rationsStepConfiguration">
<property name="tasklet">
<bean class="org.springframework.batch.execution.tasklet.Restar tableItemProviderTasklet">
<property name="itemProvider">
<bean class="org.springframework.batch.item.provider.InputSourc eItemProvider">
<property name="inputSource" ref="fileInputSource" />
</bean>
</property>
<property name="itemProcessor">
<bean
class="no.aftenposten.accountbalancer.batch.processor.Pay mentImportLineProcessor">
<property name="outputSource" ref="invoiceManagerOutputSource" />
<property name="invoicingManager" ref="invoicingManager"/>
</bean>
</property>
</bean>
</property>
<property name="chunkOperations">
<bean class="org.springframework.batch.repeat.support.RepeatTem plate">
<property name="interceptors" ref="invoiceManagerOutputSource" />
<property name="completionPolicy">
<bean class="org.springframework.batch.repeat.policy.SimpleComp letionPolicy">
<property name="chunkSize" value="1" />
</bean>
</property>
</bean>
</property>
<property name="stepOperations">
<bean class="org.springframework.batch.repeat.support.RepeatTem plate">
<property name="exceptionHandler">
<bean class="org.springframework.batch.repeat.exception.handler .SimpleLimitExceptionHandler"
p:limit="0" p:useParent="true" p:type="java.lang.Exception" />
</property>
</bean>
</property>
</bean>
</property>
</bean>

<!-- This is a framework class that needs a delegate and also needs to be registered as a RepeatInterceptor in the chunk -->
<bean id="invoiceManagerOutputSource"
class="no.aftenposten.accountbalancer.batch.outputsource. InvoicingManagerOutputSource" >
<property name="invoicingDao" ref="invoicingJdbcDao"/>
</bean>

<bean id="paymentLineMapper" class="no.aftenposten.accountbalancer.batch.mapper.Paymen tFieldSetMapper" />

<bean id="paymentLineDescriptor"
class="org.springframework.batch.io.file.support.transfor m.DelimitedLineTokenizer">
<property name="delimiter" value=";"/>
</bean>

<bean id="fileInputSource" class="org.springframework.batch.io.file.support.DefaultF latFileInputSource">
<property name="resource"
value="classpath:paymentInput.txt" />
<property name="tokenizer" ref="paymentLineDescriptor" />
<property name="fieldSetMapper" ref="paymentLineMapper" />
</bean>

<bean parent="customEditorConfigurer" />

</beans>

lucasward
Jan 25th, 2008, 12:35 PM
The definition you just showed looks like it's reading from a file, not a database using hibernate?

Also, can you post a dump of your status tables?

geira
Jan 25th, 2008, 01:44 PM
Here you have the result from the BATCH_STEP_EXECUTION table



ID VERSION STEP_ID JOB_EXECUTION_ID START_TIME END_TIME STATUS COMMIT_COUNT TASK_COUNT TASK_STATISTICS CONTINUABLE EXIT_CODE EXIT_MESSAGE
1 0 1 1 2008-01-25 20:39:00.351 2008-01-25 20:39:30.618 COMPLETED 2 2 #Fri Jan 25 20:39:30 CET 2008
sqlCursorInput.skippedRrecordCount=0
sqlCursorInput.lastProcessedRowNum=1
N COMPLETED (null)
2 0 2 2 2008-01-25 20:40:13.111 2008-01-25 20:40:15.528 COMPLETED 1 1 #Fri Jan 25 20:40:15 CET 2008
sqlCursorInput.skippedRrecordCount=0
sqlCursorInput.lastProcessedRowNum=1
N COMPLETED (null)
3 0 3 3 2008-01-25 20:41:03.45 2008-01-25 20:41:05.516 COMPLETED 1 1 #Fri Jan 25 20:41:05 CET 2008
sqlCursorInput.skippedRrecordCount=0
sqlCursorInput.lastProcessedRowNum=1
N COMPLETED (null)


We want to process the "select * from SUBSCRIPTION_LINES where active = 1 and subscriptionLineId = 3" once every 24 hour, - and do inserts into other tables, - though via jdbc... It doesn't seem to read the table, after having processed the table once.



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">

<description>This file defines the job for the ticker process.</description>
<import resource="classpath:/springAccountBalancerCore-configurer.xml"/>
<import resource="classpath:/springAccountBalancerCore-dao.xml"/>
<import resource="classpath:/springAccountBalancerCore-biz.xml"/>

<bean id="simpleTickerJob" class="org.springframework.batch.core.configuration.JobCo nfiguration" abstract="true">
<property name="restartable" value="false" />
</bean>

<bean parent="stepScope" />
<bean parent="jobConfigurationRegistryBeanPostProcessor" />

<bean id="tickerJob" parent="simpleTickerJob">
<property name="steps">
<bean id="tickerStep1"
class="org.springframework.batch.execution.step.RepeatOpe rationsStepConfiguration">
<property name="tasklet">
<bean
class="org.springframework.batch.execution.tasklet.Restar tableItemProviderTasklet">
<property name="itemProvider">
<bean class="org.springframework.batch.item.provider.InputSourc eItemProvider">
<property name="inputSource" ref="hibernateTickerJobInputSource"/>
</bean>
</property>
<property name="itemProcessor">
<bean class="no.aftenposten.accountbalancer.batch.processor.Sub scriptionLineTickerProcessor">
<property name="outputSource" ref="hibernateTickerJobOutputSource"/>
<property name="deliveryCalendarManager" ref="deliveryCalendarManager"/>
<property name="productLineDao" ref="productLineDao"/>
<property name="subscriptionDao" ref="subscriptionDao"/>
<property name="accountingManager" ref="accountingManager" />
</bean>
</property>
</bean>
</property>
<property name="chunkOperations">
<bean class="org.springframework.batch.repeat.support.RepeatTem plate">
<property name="interceptors" ref="hibernateTickerJobOutputSource"/>
<property name="completionPolicy">
<bean
class="org.springframework.batch.repeat.policy.SimpleComp letionPolicy">
<property name="chunkSize" value="1" />
</bean>
</property>
</bean>
</property>
<property name="stepOperations">
<bean class="org.springframework.batch.repeat.support.RepeatTem plate">
<property name="exceptionHandler">
<bean
class="org.springframework.batch.repeat.exception.handler .SimpleLimitExceptionHandler"
p:limit="2" p:useParent="true" p:type="java.lang.Exception" />
</property>
</bean>
</property>
</bean>
</property>
</bean>

<!-- This is a framework class that needs a delegate and also needs to be registered as a RepeatInterceptor in the chunk -->
<bean id="hibernateTickerJobOutputSource"
class="org.springframework.batch.io.support.HibernateAwar eItemWriter">
<property name="sessionFactory" ref="sessionFactoryCoreAb" />
<property name="delegate" ref="subscriptionLineTickerJobWriter" />
</bean>


<bean id="subscriptionLineTickerJobWriter"
class="no.aftenposten.accountbalancer.batch.outputsource. AccountingWriter" parent="accountingJdbcDao">
</bean>

<!--class="no.aftenposten.accountbalancer.batch.outputsource. AccountingWriter">-->
<!--<property name="dataSource" ref="oracleDataSource" />-->

<bean id="hibernateTickerJobInputSource" class="org.springframework.batch.io.cursor.JdbcCursorInpu tSource">
<property name="dataSource" ref="pulsDataSource"/>
<property name="mapper">
<bean class="no.aftenposten.accountbalancer.batch.mapper.Subscr iptionLineMapper"/>
</property>
<property name="sql" value="select * from SUBSCRIPTION_LINES where active = 1 and subscriptionLineId = 3"/>
</bean>


</beans>

Dave Syer
Jan 28th, 2008, 03:01 AM
It looks to me as if neither the input source nor its parent and grandparent is scope="step". This is probably just about OK in a standalone job, but in this case where you are re-launching multiple times in the same VM, what you get is the same input source coming back in second and subsequent runs. Not surprisingly then, it says it is exhausted at the beginning of those runs, because it was exhausted at the end of the first one. It needs to be closed and re-opened for each run, and the best way to do that is to make it scope="step".

Practically all input source (a.k.a. ItemReader) and output source (a.k.a. ItemWriter) are stateful, and therefore require scope="step" so that Spring can handle their lifecycle. This will be simpler when we get an XML namespace, because you won't have to know that this is necessary for the common out-of-the-box components.

geira
Jan 29th, 2008, 08:32 AM
I managed to get it work, - reloading the inputsource by firing up a new Spring context every time the job is started.

"parent" - is the main Spring context, - the application is running in.
I call the configuration.xml, the same name as the Spring-batch "job"-name.
I have exposed the JobStarter as a JMX via MBeanExporter.

Any thoughts about doing it this way?




public class JobStarterImpl extends StartServer implements JobStarter{
private static transient Log log = LogFactory.getLog(JobStarterImpl.class);

JobLauncher launcher;

public void setLauncher(JobLauncher launcher) {
this.launcher = launcher;
}

public void run(final String jobName) {

new Thread(new Runnable() {
ExitStatus status = ExitStatus.FAILED;
public void run() {
log.info("Loading job xml: "+jobName);
new ClassPathXmlApplicationContext(new String[]{"batch/jobs/"+jobName+".xml"},
parent);

try {
log.info("Starting job-"+jobName);
status = launcher.run(jobName).getExitStatus();
log.info("Exit code: "+status.getExitCode());
} catch (NoSuchJobConfigurationException e) {
log.error("No such configuration",e);
} catch (JobExecutionAlreadyRunningException e) {
log.error("Job "+jobName+" is already running");
}
}
;
}).start();

}
}