Question:
A Java SE solution, using the Spring framework, is being developed in order to parallelize the execution of a Stored Procedure in Oracle 9i, a Procedure that receives as a parameter one or more lines of a giant file to be processed.
This will be done through a pool of Threads, where each Thread will invoke this same Stored Procedure passing different lines of this file as parameter, in order to optimize the processing time compared to only one execution of this Procedure. However, it is necessary to guarantee transactional atomicity over all executions/transactions of this same Procedure, as is guaranteed today in a single execution of this Procedure — that is, to commit the transactions at the end and only if no other report an error: and if there is an error in any execution of a Thread, it will be necessary to rollback all the transactions of the others.
It was logically considered to use a JTA (XA) implementation in Spring, as here , where each Thread/transaction would be considered a resource participating in the two-phase commit, but I believe that the design of our solution violates the principle of the JTA mechanism, because at first it only guarantees atomicity over the resources used in the same transactional method, that is, in only the same Thread.
How to guarantee this in a non-programmatic way?
Answer:
Using just Atomikos, an implementation of JTA and XA , I made a simple example that lets you perform multithreaded processing within a transaction.
The full project is available on my GitHub .
Implementation
First of all, we have the DataSource
and TransactionManager
initialization:
// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;
// initialize resources
public static void init() {
utm = new UserTransactionManager();
try {
utm.init();
adsb = new AtomikosDataSourceBean();
adsb.setMaxPoolSize(20);
adsb.setUniqueResourceName("postgres");
adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
Properties p = new Properties();
p.setProperty("user", "postgres");
p.setProperty("password", "0");
p.setProperty("serverName", "localhost");
p.setProperty("portNumber", "5432");
p.setProperty("databaseName", "postgres");
adsb.setXaProperties(p);
} catch (SystemException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
Then, a thread that receives the instance of the main transaction ( Transaction
):
private static class Processamento implements Callable<Integer> {
private int id;
private boolean falhar;
private Transaction transaction;
public Processamento(int id, boolean falhar, Transaction transaction) {
this.falhar = falhar;
this.transaction = transaction;
this.id = id;
}
public Integer call() throws Exception {
if (falhar) {
throw new RuntimeException("Falhou inesperadamente!");
}
//enlist xa connection
XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
synchronized (transaction) {
transaction.enlistResource(xac.getXAResource());
}
//normal execution, update row with OK
Connection c = xac.getConnection();
Statement s = c.createStatement();
s.executeUpdate("update teste set processado = 'ok' where id = " + id);
s.close();
c.close();
//delist xa connection
synchronized (transaction) {
transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
}
return id;
}
}
Note that instead of using JTA, I'm directly using the XA API implemented by Atomikos.
The call AtomikosDataSource.getDS().getXaDataSource().getXAConnection()
retrieves a connection from the XA, which is added to the main transaction with the command transaction.enlistResource(xac.getXAResource())
.
I did the synchronization in some parts, as I randomly got some NullPointerException
in the tests, but this shouldn't be a problem if you use the connections with caution, that is, without opening and closing them all the time.
Finally, a method that starts five instances of the thread above:
public static int processar(boolean falhar) {
int ok = 0;
Transaction transaction = null;
try {
//start transaction
AtomikosDataSource.getTM().begin();
transaction = AtomikosDataSource.getTM().getTransaction();
//create thread pool
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();
//create 5 threads, passing the main transaction as argument
for (int i = 0; i < 5; i++) {
processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
}
//execute threads and wait
List<Future<Integer>> futures = executor.invokeAll(processos);
//count the result; get() will fail if thread threw an exception
Throwable ex = null;
for (Future<Integer> future : futures) {
try {
int threadId = future.get();
System.out.println("Thread " + threadId + " sucesso!");
ok++;
} catch (Throwable e) {
ex = e;
}
}
if (ex != null) {
throw ex;
}
//finish transaction normally
transaction.commit();
} catch (Throwable e) {
e.printStackTrace();
try {
//try to rollback
if (transaction != null) {
AtomikosDataSource.getTM().rollback();
}
} catch (IllegalStateException e1) {
e1.printStackTrace();
} catch (SecurityException e1) {
e1.printStackTrace();
} catch (SystemException e1) {
e1.printStackTrace();
}
}
return ok;
}
I ran some tests in both a success and failure scenario to validate the result.
In the success scenario, each of the five threads updates a TESTE
table row with the value ok
and commits the transaction.
In the failure scenario, the last thread always throws an exception, forcing the other four threads to rollback .
See the code on GitHub for more details.
Notes on configuration
I used PostgreSQL in the example. It was necessary to enable the max_prepared_transactions
setting with a value greater than zero in the postgresql.conf
configuration postgresql.conf
.
It is important to verify that your database driver supports distributed transactions. I read somewhere that MySQL might have some issues with this.
Comments
To make the example work with Spring, just configure the classes created manually in beans in XML or through annotations. It's up to you.
Be careful if you decide to implement something like this inside an Application Server, so as not to interfere with the normal transactions of the system.
Personally, I don't see a real need for parallel processing within the same transaction. It is much more efficient to break up processing into transactional blocks. There are several techniques for doing this without making the system state inconsistent, for example using additional columns in the table or even an extra table.