View Javadoc

1   /**
2    *
3    *  Licensed to the Apache Software Foundation (ASF) under one or more
4    *  contributor license agreements.  See the NOTICE file distributed with
5    *  this work for additional information regarding copyright ownership.
6    *  The ASF licenses this file to You under the Apache License, Version 2.0
7    *  (the "License"); you may not use this file except in compliance with
8    *  the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing, software
13   *  distributed under the License is distributed on an "AS IS" BASIS,
14   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *  See the License for the specific language governing permissions and
16   *  limitations under the License.
17   */
18  
19  package org.apache.geronimo.transaction.manager;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  
28  import javax.transaction.HeuristicMixedException;
29  import javax.transaction.HeuristicRollbackException;
30  import javax.transaction.InvalidTransactionException;
31  import javax.transaction.NotSupportedException;
32  import javax.transaction.RollbackException;
33  import javax.transaction.Status;
34  import javax.transaction.Synchronization;
35  import javax.transaction.SystemException;
36  import javax.transaction.Transaction;
37  import javax.transaction.TransactionManager;
38  import javax.transaction.UserTransaction;
39  import javax.transaction.xa.XAException;
40  import javax.transaction.xa.Xid;
41  
42  import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
43  import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.geronimo.transaction.log.UnrecoverableLog;
47  
48  /**
49   * Simple implementation of a transaction manager.
50   *
51   * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
52   */
53  public class TransactionManagerImpl implements TransactionManager, UserTransaction, XidImporter, MonitorableTransactionManager {
54      private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
55      protected static final int DEFAULT_TIMEOUT = 600;
56      protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68};
57  
58      final TransactionLog transactionLog;
59      final XidFactory xidFactory;
60      private final int defaultTransactionTimeoutMilliseconds;
61      private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal();
62      private final ThreadLocal threadTx = new ThreadLocal();
63      private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap();
64      private static final Log recoveryLog = LogFactory.getLog("RecoveryController");
65      final Recovery recovery;
66      final Collection resourceManagers;
67      private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList();
68      private List recoveryErrors = new ArrayList();
69  
70      public TransactionManagerImpl() throws XAException {
71          this(DEFAULT_TIMEOUT,
72                  null,
73                  null,
74                  null);
75      }
76  
77      public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException {
78          this(defaultTransactionTimeoutSeconds,
79                  null,
80                  null,
81                  null);
82      }
83  
84      public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
85          this(defaultTransactionTimeoutSeconds,
86                  null,
87                  transactionLog,
88                  null);
89      }
90  
91      public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog, Collection resourceManagers) throws XAException {
92          if (defaultTransactionTimeoutSeconds <= 0) {
93              throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds);
94          }
95  
96          this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000;
97  
98          if (transactionLog == null) {
99              this.transactionLog = new UnrecoverableLog();
100         } else {
101             this.transactionLog = transactionLog;
102         }
103 
104         if (xidFactory != null) {
105             this.xidFactory = xidFactory;
106         } else {
107             this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID);
108         }
109 
110         this.resourceManagers = resourceManagers;
111         recovery = new RecoveryImpl(this.transactionLog, this.xidFactory);
112 
113         if (resourceManagers != null) {
114             recovery.recoverLog();
115             List copy = watchResourceManagers(resourceManagers);
116             for (Iterator iterator = copy.iterator(); iterator.hasNext();) {
117                 ResourceManager resourceManager = (ResourceManager) iterator.next();
118                 recoverResourceManager(resourceManager);
119             }
120         }
121     }
122 
123     protected List watchResourceManagers(Collection resourceManagers) {
124         return new ArrayList(resourceManagers);
125     }
126 
127     public Transaction getTransaction() {
128         return (Transaction) threadTx.get();
129     }
130 
131     private void associate(TransactionImpl tx) throws InvalidTransactionException {
132         if (tx == null) throw new NullPointerException("tx is null");
133 
134         Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread());
135         if (existingAssociation != null) {
136             throw new InvalidTransactionException("Specified transaction is already associated with another thread");
137         }
138         threadTx.set(tx);
139         fireThreadAssociated(tx);
140     }
141 
142     private void unassociate() {
143         Transaction tx = getTransaction();
144         if (tx != null) {
145             associatedTransactions.remove(tx);
146             threadTx.set(null);
147             fireThreadUnassociated(tx);
148         }
149     }
150 
151     public void setTransactionTimeout(int seconds) throws SystemException {
152         if (seconds < 0) {
153             throw new SystemException("transaction timeout must be positive or 0 to reset to default");
154         }
155         if (seconds == 0) {
156             transactionTimeoutMilliseconds.set(null);
157         } else {
158             transactionTimeoutMilliseconds.set(new Long(seconds * 1000));
159         }
160     }
161 
162     public int getStatus() throws SystemException {
163         Transaction tx = getTransaction();
164         return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION;
165     }
166 
167     public void begin() throws NotSupportedException, SystemException {
168         begin(getTransactionTimeoutMilliseconds(0L));
169     }
170 
171     public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException {
172         if (getStatus() != Status.STATUS_NO_TRANSACTION) {
173             throw new NotSupportedException("Nested Transactions are not supported");
174         }
175         TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
176 //        timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
177         try {
178             associate(tx);
179         } catch (InvalidTransactionException e) {
180             // should not be possible since we just created that transaction and no one has a reference yet
181             throw new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction");
182         }
183         // Todo: Verify if this is correct thing to do. Use default timeout for next transaction.
184         this.transactionTimeoutMilliseconds.set(null);
185         return tx;
186     }
187 
188     public Transaction suspend() throws SystemException {
189         Transaction tx = getTransaction();
190         if (tx != null) {
191             unassociate();
192         }
193         return tx;
194     }
195 
196     public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException {
197         if (getTransaction() != null) {
198             throw new IllegalStateException("Thread already associated with another transaction");
199         }
200         if (!(tx instanceof TransactionImpl)) {
201             throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx);
202         }
203         associate((TransactionImpl) tx);
204     }
205 
206     public Object getResource(Object key) {
207         TransactionImpl tx = getActiveTransactionImpl();
208         return tx.getResource(key);
209     }
210 
211     private TransactionImpl getActiveTransactionImpl() {
212         TransactionImpl tx = (TransactionImpl)threadTx.get();
213         if (tx == null) {
214             throw new IllegalStateException("No tx on thread");
215         }
216         if (tx.getStatus() != Status.STATUS_ACTIVE) {
217             throw new IllegalStateException("Transaction " + tx + " is not active");
218         }
219         return tx;
220     }
221 
222     public boolean getRollbackOnly() {
223         TransactionImpl tx = getActiveTransactionImpl();
224         return tx.getRollbackOnly();
225     }
226 
227     public Object getTransactionKey() {
228         TransactionImpl tx = getActiveTransactionImpl();
229         return tx.getTransactionKey();
230     }
231 
232     public int getTransactionStatus() {
233         TransactionImpl tx = getActiveTransactionImpl();
234         return tx.getTransactionStatus();
235     }
236 
237     public void putResource(Object key, Object value) {
238         TransactionImpl tx = getActiveTransactionImpl();
239         tx.putResource(key, value);
240     }
241 
242     /**
243      * jta 1.1 method so the jpa implementations can be told to flush their caches.
244      * @param synchronization
245      */
246     public void registerInterposedSynchronization(Synchronization synchronization) {
247         TransactionImpl tx = getActiveTransactionImpl();
248         tx.registerInterposedSynchronization(synchronization);
249     }
250 
251     public void setRollbackOnly() throws IllegalStateException {
252         TransactionImpl tx = (TransactionImpl) threadTx.get();
253         if (tx == null) {
254             throw new IllegalStateException("No transaction associated with current thread");
255         }
256         tx.setRollbackOnly();
257     }
258 
259     public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
260         Transaction tx = getTransaction();
261         if (tx == null) {
262             throw new IllegalStateException("No transaction associated with current thread");
263         }
264         try {
265             tx.commit();
266         } finally {
267             unassociate();
268         }
269     }
270 
271     public void rollback() throws IllegalStateException, SecurityException, SystemException {
272         Transaction tx = getTransaction();
273         if (tx == null) {
274             throw new IllegalStateException("No transaction associated with current thread");
275         }
276         try {
277             tx.rollback();
278         } finally {
279             unassociate();
280         }
281     }
282 
283     //XidImporter implementation
284     public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException {
285         if (transactionTimeoutMilliseconds < 0) {
286             throw new SystemException("transaction timeout must be positive or 0 to reset to default");
287         }
288         TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
289         return tx;
290     }
291 
292     public void commit(Transaction tx, boolean onePhase) throws XAException {
293         if (onePhase) {
294             try {
295                 tx.commit();
296             } catch (HeuristicMixedException e) {
297                 throw (XAException) new XAException().initCause(e);
298             } catch (HeuristicRollbackException e) {
299                 throw (XAException) new XAException().initCause(e);
300             } catch (RollbackException e) {
301                 throw (XAException) new XAException().initCause(e);
302             } catch (SecurityException e) {
303                 throw (XAException) new XAException().initCause(e);
304             } catch (SystemException e) {
305                 throw (XAException) new XAException().initCause(e);
306             }
307         } else {
308             try {
309                 ((TransactionImpl) tx).preparedCommit();
310             } catch (SystemException e) {
311                 throw (XAException) new XAException().initCause(e);
312             }
313         }
314     }
315 
316     public void forget(Transaction tx) throws XAException {
317         //TODO implement this!
318     }
319 
320     public int prepare(Transaction tx) throws XAException {
321         try {
322             return ((TransactionImpl) tx).prepare();
323         } catch (SystemException e) {
324             throw (XAException) new XAException().initCause(e);
325         } catch (RollbackException e) {
326             throw (XAException) new XAException().initCause(e);
327         }
328     }
329 
330     public void rollback(Transaction tx) throws XAException {
331         try {
332             tx.rollback();
333         } catch (IllegalStateException e) {
334             throw (XAException) new XAException().initCause(e);
335         } catch (SystemException e) {
336             throw (XAException) new XAException().initCause(e);
337         }
338     }
339 
340     long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) {
341         if (transactionTimeoutMilliseconds != 0) {
342             return transactionTimeoutMilliseconds;
343         }
344         Long timeout = (Long) this.transactionTimeoutMilliseconds.get();
345         if (timeout != null) {
346             return timeout.longValue();
347         }
348         return defaultTransactionTimeoutMilliseconds;
349     }
350 
351     protected void recoverResourceManager(ResourceManager resourceManager) {
352         NamedXAResource namedXAResource;
353         try {
354             namedXAResource = resourceManager.getRecoveryXAResources();
355         } catch (SystemException e) {
356             recoveryLog.error(e);
357             recoveryErrors.add(e);
358             return;
359         }
360         if (namedXAResource != null) {
361             try {
362                 recovery.recoverResourceManager(namedXAResource);
363             } catch (XAException e) {
364                 recoveryLog.error(e);
365                 recoveryErrors.add(e);
366             } finally {
367                 resourceManager.returnResource(namedXAResource);
368             }
369         }
370     }
371 
372 
373     public Map getExternalXids() {
374         return new HashMap(recovery.getExternalXids());
375     }
376 
377     public void addTransactionAssociationListener(TransactionManagerMonitor listener) {
378         transactionAssociationListeners.addIfAbsent(listener);
379     }
380 
381     public void removeTransactionAssociationListener(TransactionManagerMonitor listener) {
382         transactionAssociationListeners.remove(listener);
383     }
384 
385     protected void fireThreadAssociated(Transaction tx) {
386         for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
387             TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
388             try {
389                 listener.threadAssociated(tx);
390             } catch (Exception e) {
391                 log.warn("Error calling transaction association listener", e);
392             }
393         }
394     }
395 
396     protected void fireThreadUnassociated(Transaction tx) {
397         for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
398             TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
399             try {
400                 listener.threadUnassociated(tx);
401             } catch (Exception e) {
402                 log.warn("Error calling transaction association listener", e);
403             }
404         }
405     }
406 }