Atomikos 3.8.0, MySQL 5.1.22, Spring 3.2.0
Hello,
I have the following context xml for Jta, Atomikos and 2 Atomikos Datasource beans (batch and uymDS):
The Dao bean isCode:<bean id="setMyAtomikosSystemProps" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> <property name="targetObject"> <!-- System.getProperties() --> <bean class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> <property name="targetClass" value="java.lang.System" /> <property name="targetMethod" value="getProperties" /> </bean> </property> <property name="targetMethod" value="putAll" /> <property name="arguments"> <!-- The new Properties --> <util:properties> <prop key="com.atomikos.icatch.file">/opt/etl/monitorPrepFtpFilesMvc/jta.properties</prop> <prop key="com.atomikos.icatch.hide_init_file_path">true</prop> </util:properties> </property> </bean> <bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce"> <constructor-arg> <!-- IMPORTANT: specify all Atomikos properties here --> <props> <prop key="com.atomikos.icatch.service"> com.atomikos.icatch.standalone.UserTransactionServiceFactory </prop> </props> </constructor-arg> <!-- <property name="initialLogAdministrators"> <list> <ref bean="localLogAdministrator" /> </list> </property> --> </bean> <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService"> <!-- IMPORTANT: disable startup because the userTransactionService above does this --> <property name="startupTransactionService" value="false" /> <!-- when close is called, should we force transactions to terminate or not? --> <property name="forceShutdown" value="false" /> </bean> <!-- Also use Atomikos UserTransactionImp, needed to configure Spring --> <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService"> <property name="transactionTimeout" value="300" /> </bean> <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService"> <property name="transactionManager" ref="atomikosTransactionManager" /> <property name="userTransaction" ref="atomikosUserTransaction" /> <property name="allowCustomIsolationLevels" value="true" /> </bean> <!-- batch --> <util:properties id="batchDbProperties" location="classpath:/META-INF/batchdb.properties" /> <bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close" depends-on="setMyAtomikosSystemProps"> <property name="uniqueResourceName" value="dataSource" /> <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" /> <property name="xaProperties" ref="batchDbProperties" /> <property name="minPoolSize" value="10" /> <property name="maxPoolSize" value="20" /> <property name="borrowConnectionTimeout" value="30" /> <property name="testQuery" value="select 1" /> <property name="maintenanceInterval" value="60" /> </bean> <!-- uym --> <util:properties id="uymDbProperties" location="classpath:/META-INF/uymdb.properties" /> <bean id="uymDS" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close" depends-on="setMyAtomikosSystemProps"> <property name="uniqueResourceName" value="uymDS" /> <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" /> <property name="xaProperties" ref="uymDbProperties" /> <property name="minPoolSize" value="10" /> <property name="maxPoolSize" value="20" /> <property name="borrowConnectionTimeout" value="30" /> <property name="testQuery" value="select 1" /> <property name="maintenanceInterval" value="60" /> </bean>
and the Dao implementation and pertinent code isCode:<bean id="uymFileJdbcDao" class="com.xxx.domain.uym.UymFileJdbcDao"> <property name="dataSource" ref="uymDS" /> </bean>
The job step isCode:public class UymFileJdbcDao extends JdbcDaoSupport implements UymFileDao { static final Logger logger = LoggerFactory.getLogger(UymFileJdbcDao.class); private static final String[] COLUMNNAMES = {"history_log_id", "da_org_id", "abbr", "name", "path", "mtime", "size", "type", "extension", "row_cnt", "status" }; private static final String table = " file "; private static final String SQL_INSERT = "INSERT INTO" + table + "(" + "history_log_id, da_org_id, abbr, name, path, " + "mtime, size, type, extension, row_cnt, " + "status) " + "VALUES (" + "?,?,?,?,?,?,?,?,?,?," + "?" + ")"; private RowMapper<UymFile> rowMapper = new UymFileRowMapper(); public long insertRecord(UymFile record) { KeyHolder keyHolder = new GeneratedKeyHolder(); final Integer getHistoryLogId = record.getHistoryLogId(); final Integer getDaOrgId = record.getDaOrgId(); final String getAbbr = record.getAbbr(); final String getName = record.getName(); final String getPath = record.getPath(); final String getMtime = record.getMtime(); final String getSize = record.getSize(); final String getType = record.getType(); final String getExtension = record.getExtension(); final Integer getRowCnt = record.getRowCnt(); final Integer getStatus = record.getStatus(); int row = getJdbcTemplate().update(new PreparedStatementCreator() { public PreparedStatement createPreparedStatement(Connection conn) throws SQLException { PreparedStatement ps = conn.prepareStatement(SQL_INSERT, COLUMNNAMES); ps.setInt(1, getHistoryLogId); ps.setInt(2, getDaOrgId); ps.setString(3, getAbbr); ps.setString(4, getName); ps.setString(5, getPath); ps.setString(6, getMtime); ps.setString(7, getSize); ps.setString(8, getType); ps.setString(9, getExtension); ps.setInt(10, getRowCnt); ps.setInt(11, getStatus); return ps; } }, keyHolder); long rowId = 0; if (row > 0) rowId = keyHolder.getKey().longValue(); return rowId; } }
So for each file collected it will read in the file details, convert it to an UymFile and write it to the database.Code:<batch:step id="step7"> <batch:tasklet> <batch:chunk reader="filePathItemReader" processor="filePathItemProcessor" writer="filePathItemWriter" commit-interval="1" /> <batch:listeners> <batch:listener ref="stepExecutionListener" /> </batch:listeners> </batch:tasklet> </batch:step>
The filePathItemWriter has:
If I don't include the updateRecordWithFile I do not get the java.sql.SQLException: Lock wait timeout exceeded; try restarting transaction.Code:@Override public void write(List<? extends UymFile> items) throws Exception { for (UymFile file : items) { long newId = dao.insertRecord(file); updateRecordWithFile(file, newId); } } private void updateRecordWithFile(UymFile file, long Id) throws SQLException, IOException { Connection conn = dao.getDataSource().getConnection(); File filepath = new File(file.getPath()); fis = new FileInputStream(filepath); ps = conn.prepareStatement(INSERT_FILE_SQL); ps.setBinaryStream(1, fis, (int) filepath.length()); ps.setLong(2, Id); ps.executeUpdate(); ps.close(); fis.close(); System.out.println("In the writer... " + file.getName() + " id: " + Id); }
My understanding was that by setting jta as the transaction manager, using atomikos beans, mysql default as commit true and commit-interval == 1 that commits would occur with each insert / update. I don't have to put @Transactional - I am leaving it to the Jta to decide....
Please advise - thanks.


Reply With Quote
