View Javadoc

1   /**
2    *  Licensed to the Apache Software Foundation (ASF) under one or more
3    *  contributor license agreements.  See the NOTICE file distributed with
4    *  this work for additional information regarding copyright ownership.
5    *  The ASF licenses this file to You under the Apache License, Version 2.0
6    *  (the "License"); you may not use this file except in compliance with
7    *  the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   *  Unless required by applicable law or agreed to in writing, software
12   *  distributed under the License is distributed on an "AS IS" BASIS,
13   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   *  See the License for the specific language governing permissions and
15   *  limitations under the License.
16   */
17  
18  package org.apache.geronimo.transaction.manager;
19  
20  import java.util.ArrayList;
21  import java.util.HashMap;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Map;
25  
26  import javax.transaction.*;
27  import javax.transaction.xa.XAException;
28  import javax.transaction.xa.Xid;
29  
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.CopyOnWriteArrayList;
32  import java.util.concurrent.atomic.AtomicLong;
33  import org.apache.geronimo.transaction.log.UnrecoverableLog;
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  /**
38   * Simple implementation of a transaction manager.
39   *
40   * @version $Rev: 727065 $ $Date: 2008-12-16 10:22:22 -0500 (Tue, 16 Dec 2008) $
41   */
42  public class TransactionManagerImpl implements TransactionManager, UserTransaction, TransactionSynchronizationRegistry, XidImporter, MonitorableTransactionManager, RecoverableTransactionManager {
43      private static final Logger log = LoggerFactory.getLogger(TransactionManagerImpl.class);
44      protected static final int DEFAULT_TIMEOUT = 600;
45      protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68};
46  
47      final TransactionLog transactionLog;
48      final XidFactory xidFactory;
49      private final int defaultTransactionTimeoutMilliseconds;
50      private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal();
51      private final ThreadLocal threadTx = new ThreadLocal();
52      private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap();
53      private static final Logger recoveryLog = LoggerFactory.getLogger("RecoveryController");
54      final Recovery recovery;
55      private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList();
56      private List recoveryErrors = new ArrayList();
57      // statistics
58      private AtomicLong totalCommits = new AtomicLong(0);
59      private AtomicLong totalRollBacks = new AtomicLong(0);
60      private AtomicLong activeCount = new AtomicLong(0);
61  
62      public TransactionManagerImpl() throws XAException {
63          this(DEFAULT_TIMEOUT,
64                  null,
65                  null
66          );
67      }
68  
69      public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException {
70          this(defaultTransactionTimeoutSeconds,
71                  null,
72                  null
73          );
74      }
75  
76      public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
77          this(defaultTransactionTimeoutSeconds,
78                  null,
79                  transactionLog
80          );
81      }
82  
83      public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog) throws XAException {
84          if (defaultTransactionTimeoutSeconds <= 0) {
85              throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds);
86          }
87          this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000;
88  
89          if (transactionLog == null) {
90              this.transactionLog = new UnrecoverableLog();
91          } else {
92              this.transactionLog = transactionLog;
93          }
94  
95          if (xidFactory != null) {
96              this.xidFactory = xidFactory;
97          } else {
98              this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID);
99          }
100 
101         recovery = new RecoveryImpl(this.transactionLog, this.xidFactory);
102         recovery.recoverLog();
103     }
104 
105     public Transaction getTransaction() {
106         return (Transaction) threadTx.get();
107     }
108 
109     private void associate(TransactionImpl tx) throws InvalidTransactionException {
110         if (tx.getStatus() == Status.STATUS_NO_TRANSACTION) {
111             throw new InvalidTransactionException("Cannot resume invalid transaction: " + tx);
112         } else {
113             Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread());
114             if (existingAssociation != null) {
115                 throw new InvalidTransactionException("Specified transaction is already associated with another thread");
116             }
117             threadTx.set(tx);
118             fireThreadAssociated(tx);
119             activeCount.getAndIncrement();
120         }
121     } 
122 
123     private void unassociate() {
124         Transaction tx = getTransaction();
125         if (tx != null) {
126             associatedTransactions.remove(tx);
127             threadTx.set(null);
128             fireThreadUnassociated(tx);
129             activeCount.getAndDecrement();
130         }
131     }
132 
133     public void setTransactionTimeout(int seconds) throws SystemException {
134         if (seconds < 0) {
135             throw new SystemException("transaction timeout must be positive or 0 to reset to default");
136         }
137         if (seconds == 0) {
138             transactionTimeoutMilliseconds.set(null);
139         } else {
140             transactionTimeoutMilliseconds.set(new Long(seconds * 1000));
141         }
142     }
143 
144     public int getStatus() throws SystemException {
145         Transaction tx = getTransaction();
146         return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION;
147     }
148 
149     public void begin() throws NotSupportedException, SystemException {
150         begin(getTransactionTimeoutMilliseconds(0L));
151     }
152 
153     public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException {
154         if (getStatus() != Status.STATUS_NO_TRANSACTION) {
155             throw new NotSupportedException("Nested Transactions are not supported");
156         }
157         TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
158 //        timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
159         try {
160             associate(tx);
161         } catch (InvalidTransactionException e) {
162             // should not be possible since we just created that transaction and no one has a reference yet
163             throw (SystemException)new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction").initCause(e);
164         }
165         // Todo: Verify if this is correct thing to do. Use default timeout for next transaction.
166         this.transactionTimeoutMilliseconds.set(null);
167         return tx;
168     }
169 
170     public Transaction suspend() throws SystemException {
171         Transaction tx = getTransaction();
172         if (tx != null) {
173             unassociate();
174         }
175         return tx;
176     }
177 
178     public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException {
179         if (getTransaction() != null && tx != getTransaction()) {
180             throw new IllegalStateException("Thread already associated with another transaction");
181         }
182         if (tx != null && tx != getTransaction()) {
183             if (!(tx instanceof TransactionImpl)) {
184                 throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx);
185             }
186             
187             associate((TransactionImpl) tx);
188         }
189     }
190 
191     public Object getResource(Object key) {
192         TransactionImpl tx = getActiveTransactionImpl();
193         return tx.getResource(key);
194     }
195 
196     private TransactionImpl getActiveTransactionImpl() {
197         TransactionImpl tx = (TransactionImpl)threadTx.get();
198         if (tx == null) {
199             throw new IllegalStateException("No tx on thread");
200         }
201         if (tx.getStatus() != Status.STATUS_ACTIVE && tx.getStatus() != Status.STATUS_MARKED_ROLLBACK) {
202             throw new IllegalStateException("Transaction " + tx + " is not active");
203         }
204         return tx;
205     }
206 
207     public boolean getRollbackOnly() {
208         TransactionImpl tx = getActiveTransactionImpl();
209         return tx.getRollbackOnly();
210     }
211 
212     public Object getTransactionKey() {
213     	TransactionImpl tx = (TransactionImpl) getTransaction();
214         return tx == null ? null: tx.getTransactionKey();
215     }
216 
217     public int getTransactionStatus() {
218         TransactionImpl tx = (TransactionImpl) getTransaction();
219         return tx == null? Status.STATUS_NO_TRANSACTION: tx.getTransactionStatus();
220     }
221 
222     public void putResource(Object key, Object value) {
223         TransactionImpl tx = getActiveTransactionImpl();
224         tx.putResource(key, value);
225     }
226 
227     /**
228      * jta 1.1 method so the jpa implementations can be told to flush their caches.
229      * @param synchronization
230      */
231     public void registerInterposedSynchronization(Synchronization synchronization) {
232         TransactionImpl tx = getActiveTransactionImpl();
233         tx.registerInterposedSynchronization(synchronization);
234     }
235 
236     public void setRollbackOnly() throws IllegalStateException {
237         TransactionImpl tx = (TransactionImpl) threadTx.get();
238         if (tx == null) {
239             throw new IllegalStateException("No transaction associated with current thread");
240         }
241         tx.setRollbackOnly();
242     }
243 
244     public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
245         Transaction tx = getTransaction();
246         if (tx == null) {
247             throw new IllegalStateException("No transaction associated with current thread");
248         }
249         try {
250             tx.commit();
251         } finally {
252             unassociate();
253         }
254         totalCommits.getAndIncrement();
255     }
256 
257     public void rollback() throws IllegalStateException, SecurityException, SystemException {
258         Transaction tx = getTransaction();
259         if (tx == null) {
260             throw new IllegalStateException("No transaction associated with current thread");
261         }
262         try {
263             tx.rollback();
264         } finally {
265             unassociate();
266         }
267         totalRollBacks.getAndIncrement();
268     }
269 
270     //XidImporter implementation
271     public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException {
272         if (transactionTimeoutMilliseconds < 0) {
273             throw new SystemException("transaction timeout must be positive or 0 to reset to default");
274         }
275         TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
276         return tx;
277     }
278 
279     public void commit(Transaction tx, boolean onePhase) throws XAException {
280         if (onePhase) {
281             try {
282                 tx.commit();
283             } catch (HeuristicMixedException e) {
284                 throw (XAException) new XAException().initCause(e);
285             } catch (HeuristicRollbackException e) {
286                 throw (XAException) new XAException().initCause(e);
287             } catch (RollbackException e) {
288                 throw (XAException) new XAException().initCause(e);
289             } catch (SecurityException e) {
290                 throw (XAException) new XAException().initCause(e);
291             } catch (SystemException e) {
292                 throw (XAException) new XAException().initCause(e);
293             }
294         } else {
295             try {
296                 ((TransactionImpl) tx).preparedCommit();
297             } catch (HeuristicMixedException e) {
298                 throw (XAException) new XAException().initCause(e);
299             } catch (HeuristicRollbackException e) {
300                 throw (XAException) new XAException().initCause(e);
301             } catch (SystemException e) {
302                 throw (XAException) new XAException().initCause(e);
303             }
304         }
305         totalCommits.getAndIncrement();
306     }
307 
308     public void forget(Transaction tx) throws XAException {
309         //TODO implement this!
310     }
311 
312     public int prepare(Transaction tx) throws XAException {
313         try {
314             return ((TransactionImpl) tx).prepare();
315         } catch (SystemException e) {
316             throw (XAException) new XAException().initCause(e);
317         } catch (RollbackException e) {
318             throw (XAException) new XAException().initCause(e);
319         }
320     }
321 
322     public void rollback(Transaction tx) throws XAException {
323         try {
324             tx.rollback();
325         } catch (IllegalStateException e) {
326             throw (XAException) new XAException().initCause(e);
327         } catch (SystemException e) {
328             throw (XAException) new XAException().initCause(e);
329         }
330         totalRollBacks.getAndIncrement();
331     }
332 
333     long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) {
334         if (transactionTimeoutMilliseconds != 0) {
335             return transactionTimeoutMilliseconds;
336         }
337         Long timeout = (Long) this.transactionTimeoutMilliseconds.get();
338         if (timeout != null) {
339             return timeout.longValue();
340         }
341         return defaultTransactionTimeoutMilliseconds;
342     }
343 
344     //Recovery
345     public void recoveryError(Exception e) {
346         recoveryLog.error("Recovery error", e);
347         recoveryErrors.add(e);
348     }
349 
350     public void recoverResourceManager(NamedXAResource xaResource) {
351         try {
352             recovery.recoverResourceManager(xaResource);
353         } catch (XAException e) {
354             recoveryError(e);
355         }
356     }
357 
358     public Map getExternalXids() {
359         return new HashMap(recovery.getExternalXids());
360     }
361 
362     public void addTransactionAssociationListener(TransactionManagerMonitor listener) {
363         transactionAssociationListeners.addIfAbsent(listener);
364     }
365 
366     public void removeTransactionAssociationListener(TransactionManagerMonitor listener) {
367         transactionAssociationListeners.remove(listener);
368     }
369 
370     protected void fireThreadAssociated(Transaction tx) {
371         for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
372             TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
373             try {
374                 listener.threadAssociated(tx);
375             } catch (Exception e) {
376                 log.warn("Error calling transaction association listener", e);
377             }
378         }
379     }
380 
381     protected void fireThreadUnassociated(Transaction tx) {
382         for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
383             TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
384             try {
385                 listener.threadUnassociated(tx);
386             } catch (Exception e) {
387                 log.warn("Error calling transaction association listener", e);
388             }
389         }
390     }
391 
392     /**
393      * Returns the number of active transactions.
394      */
395     public long getActiveCount() {
396         return activeCount.longValue();
397     }
398 
399     /**
400      * Return the number of total commits
401      */
402     public long getTotalCommits() {
403         return totalCommits.longValue();
404     }
405 
406     /**
407      * Returns the number of total rollbacks
408      */
409     public long getTotalRollbacks() {
410         return totalRollBacks.longValue();
411     }
412 
413     /**
414      * Reset statistics
415      */
416     public void resetStatistics() {
417         totalCommits.getAndSet(0);
418         totalRollBacks.getAndSet(0);
419     }
420 }