001    /**
002     *
003     *  Licensed to the Apache Software Foundation (ASF) under one or more
004     *  contributor license agreements.  See the NOTICE file distributed with
005     *  this work for additional information regarding copyright ownership.
006     *  The ASF licenses this file to You under the Apache License, Version 2.0
007     *  (the "License"); you may not use this file except in compliance with
008     *  the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     *  Unless required by applicable law or agreed to in writing, software
013     *  distributed under the License is distributed on an "AS IS" BASIS,
014     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     *  See the License for the specific language governing permissions and
016     *  limitations under the License.
017     */
018    
019    package org.apache.geronimo.transaction.manager;
020    
021    import java.util.ArrayList;
022    import java.util.Collection;
023    import java.util.HashMap;
024    import java.util.Iterator;
025    import java.util.List;
026    import java.util.Map;
027    
028    import javax.transaction.HeuristicMixedException;
029    import javax.transaction.HeuristicRollbackException;
030    import javax.transaction.InvalidTransactionException;
031    import javax.transaction.NotSupportedException;
032    import javax.transaction.RollbackException;
033    import javax.transaction.Status;
034    import javax.transaction.Synchronization;
035    import javax.transaction.SystemException;
036    import javax.transaction.Transaction;
037    import javax.transaction.TransactionManager;
038    import javax.transaction.UserTransaction;
039    import javax.transaction.xa.XAException;
040    import javax.transaction.xa.Xid;
041    
042    import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
043    import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
044    import org.apache.commons.logging.Log;
045    import org.apache.commons.logging.LogFactory;
046    import org.apache.geronimo.transaction.log.UnrecoverableLog;
047    
048    /**
049     * Simple implementation of a transaction manager.
050     *
051     * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
052     */
053    public class TransactionManagerImpl implements TransactionManager, UserTransaction, XidImporter, MonitorableTransactionManager {
054        private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
055        protected static final int DEFAULT_TIMEOUT = 600;
056        protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68};
057    
058        final TransactionLog transactionLog;
059        final XidFactory xidFactory;
060        private final int defaultTransactionTimeoutMilliseconds;
061        private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal();
062        private final ThreadLocal threadTx = new ThreadLocal();
063        private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap();
064        private static final Log recoveryLog = LogFactory.getLog("RecoveryController");
065        final Recovery recovery;
066        final Collection resourceManagers;
067        private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList();
068        private List recoveryErrors = new ArrayList();
069    
070        public TransactionManagerImpl() throws XAException {
071            this(DEFAULT_TIMEOUT,
072                    null,
073                    null,
074                    null);
075        }
076    
077        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException {
078            this(defaultTransactionTimeoutSeconds,
079                    null,
080                    null,
081                    null);
082        }
083    
084        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
085            this(defaultTransactionTimeoutSeconds,
086                    null,
087                    transactionLog,
088                    null);
089        }
090    
091        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog, Collection resourceManagers) throws XAException {
092            if (defaultTransactionTimeoutSeconds <= 0) {
093                throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds);
094            }
095    
096            this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000;
097    
098            if (transactionLog == null) {
099                this.transactionLog = new UnrecoverableLog();
100            } else {
101                this.transactionLog = transactionLog;
102            }
103    
104            if (xidFactory != null) {
105                this.xidFactory = xidFactory;
106            } else {
107                this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID);
108            }
109    
110            this.resourceManagers = resourceManagers;
111            recovery = new RecoveryImpl(this.transactionLog, this.xidFactory);
112    
113            if (resourceManagers != null) {
114                recovery.recoverLog();
115                List copy = watchResourceManagers(resourceManagers);
116                for (Iterator iterator = copy.iterator(); iterator.hasNext();) {
117                    ResourceManager resourceManager = (ResourceManager) iterator.next();
118                    recoverResourceManager(resourceManager);
119                }
120            }
121        }
122    
123        protected List watchResourceManagers(Collection resourceManagers) {
124            return new ArrayList(resourceManagers);
125        }
126    
127        public Transaction getTransaction() {
128            return (Transaction) threadTx.get();
129        }
130    
131        private void associate(TransactionImpl tx) throws InvalidTransactionException {
132            if (tx == null) throw new NullPointerException("tx is null");
133    
134            Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread());
135            if (existingAssociation != null) {
136                throw new InvalidTransactionException("Specified transaction is already associated with another thread");
137            }
138            threadTx.set(tx);
139            fireThreadAssociated(tx);
140        }
141    
142        private void unassociate() {
143            Transaction tx = getTransaction();
144            if (tx != null) {
145                associatedTransactions.remove(tx);
146                threadTx.set(null);
147                fireThreadUnassociated(tx);
148            }
149        }
150    
151        public void setTransactionTimeout(int seconds) throws SystemException {
152            if (seconds < 0) {
153                throw new SystemException("transaction timeout must be positive or 0 to reset to default");
154            }
155            if (seconds == 0) {
156                transactionTimeoutMilliseconds.set(null);
157            } else {
158                transactionTimeoutMilliseconds.set(new Long(seconds * 1000));
159            }
160        }
161    
162        public int getStatus() throws SystemException {
163            Transaction tx = getTransaction();
164            return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION;
165        }
166    
167        public void begin() throws NotSupportedException, SystemException {
168            begin(getTransactionTimeoutMilliseconds(0L));
169        }
170    
171        public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException {
172            if (getStatus() != Status.STATUS_NO_TRANSACTION) {
173                throw new NotSupportedException("Nested Transactions are not supported");
174            }
175            TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
176    //        timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
177            try {
178                associate(tx);
179            } catch (InvalidTransactionException e) {
180                // should not be possible since we just created that transaction and no one has a reference yet
181                throw new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction");
182            }
183            // Todo: Verify if this is correct thing to do. Use default timeout for next transaction.
184            this.transactionTimeoutMilliseconds.set(null);
185            return tx;
186        }
187    
188        public Transaction suspend() throws SystemException {
189            Transaction tx = getTransaction();
190            if (tx != null) {
191                unassociate();
192            }
193            return tx;
194        }
195    
196        public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException {
197            if (getTransaction() != null) {
198                throw new IllegalStateException("Thread already associated with another transaction");
199            }
200            if (!(tx instanceof TransactionImpl)) {
201                throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx);
202            }
203            associate((TransactionImpl) tx);
204        }
205    
206        public Object getResource(Object key) {
207            TransactionImpl tx = getActiveTransactionImpl();
208            return tx.getResource(key);
209        }
210    
211        private TransactionImpl getActiveTransactionImpl() {
212            TransactionImpl tx = (TransactionImpl)threadTx.get();
213            if (tx == null) {
214                throw new IllegalStateException("No tx on thread");
215            }
216            if (tx.getStatus() != Status.STATUS_ACTIVE) {
217                throw new IllegalStateException("Transaction " + tx + " is not active");
218            }
219            return tx;
220        }
221    
222        public boolean getRollbackOnly() {
223            TransactionImpl tx = getActiveTransactionImpl();
224            return tx.getRollbackOnly();
225        }
226    
227        public Object getTransactionKey() {
228            TransactionImpl tx = getActiveTransactionImpl();
229            return tx.getTransactionKey();
230        }
231    
232        public int getTransactionStatus() {
233            TransactionImpl tx = getActiveTransactionImpl();
234            return tx.getTransactionStatus();
235        }
236    
237        public void putResource(Object key, Object value) {
238            TransactionImpl tx = getActiveTransactionImpl();
239            tx.putResource(key, value);
240        }
241    
242        /**
243         * jta 1.1 method so the jpa implementations can be told to flush their caches.
244         * @param synchronization
245         */
246        public void registerInterposedSynchronization(Synchronization synchronization) {
247            TransactionImpl tx = getActiveTransactionImpl();
248            tx.registerInterposedSynchronization(synchronization);
249        }
250    
251        public void setRollbackOnly() throws IllegalStateException {
252            TransactionImpl tx = (TransactionImpl) threadTx.get();
253            if (tx == null) {
254                throw new IllegalStateException("No transaction associated with current thread");
255            }
256            tx.setRollbackOnly();
257        }
258    
259        public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
260            Transaction tx = getTransaction();
261            if (tx == null) {
262                throw new IllegalStateException("No transaction associated with current thread");
263            }
264            try {
265                tx.commit();
266            } finally {
267                unassociate();
268            }
269        }
270    
271        public void rollback() throws IllegalStateException, SecurityException, SystemException {
272            Transaction tx = getTransaction();
273            if (tx == null) {
274                throw new IllegalStateException("No transaction associated with current thread");
275            }
276            try {
277                tx.rollback();
278            } finally {
279                unassociate();
280            }
281        }
282    
283        //XidImporter implementation
284        public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException {
285            if (transactionTimeoutMilliseconds < 0) {
286                throw new SystemException("transaction timeout must be positive or 0 to reset to default");
287            }
288            TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
289            return tx;
290        }
291    
292        public void commit(Transaction tx, boolean onePhase) throws XAException {
293            if (onePhase) {
294                try {
295                    tx.commit();
296                } catch (HeuristicMixedException e) {
297                    throw (XAException) new XAException().initCause(e);
298                } catch (HeuristicRollbackException e) {
299                    throw (XAException) new XAException().initCause(e);
300                } catch (RollbackException e) {
301                    throw (XAException) new XAException().initCause(e);
302                } catch (SecurityException e) {
303                    throw (XAException) new XAException().initCause(e);
304                } catch (SystemException e) {
305                    throw (XAException) new XAException().initCause(e);
306                }
307            } else {
308                try {
309                    ((TransactionImpl) tx).preparedCommit();
310                } catch (SystemException e) {
311                    throw (XAException) new XAException().initCause(e);
312                }
313            }
314        }
315    
316        public void forget(Transaction tx) throws XAException {
317            //TODO implement this!
318        }
319    
320        public int prepare(Transaction tx) throws XAException {
321            try {
322                return ((TransactionImpl) tx).prepare();
323            } catch (SystemException e) {
324                throw (XAException) new XAException().initCause(e);
325            } catch (RollbackException e) {
326                throw (XAException) new XAException().initCause(e);
327            }
328        }
329    
330        public void rollback(Transaction tx) throws XAException {
331            try {
332                tx.rollback();
333            } catch (IllegalStateException e) {
334                throw (XAException) new XAException().initCause(e);
335            } catch (SystemException e) {
336                throw (XAException) new XAException().initCause(e);
337            }
338        }
339    
340        long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) {
341            if (transactionTimeoutMilliseconds != 0) {
342                return transactionTimeoutMilliseconds;
343            }
344            Long timeout = (Long) this.transactionTimeoutMilliseconds.get();
345            if (timeout != null) {
346                return timeout.longValue();
347            }
348            return defaultTransactionTimeoutMilliseconds;
349        }
350    
351        protected void recoverResourceManager(ResourceManager resourceManager) {
352            NamedXAResource namedXAResource;
353            try {
354                namedXAResource = resourceManager.getRecoveryXAResources();
355            } catch (SystemException e) {
356                recoveryLog.error(e);
357                recoveryErrors.add(e);
358                return;
359            }
360            if (namedXAResource != null) {
361                try {
362                    recovery.recoverResourceManager(namedXAResource);
363                } catch (XAException e) {
364                    recoveryLog.error(e);
365                    recoveryErrors.add(e);
366                } finally {
367                    resourceManager.returnResource(namedXAResource);
368                }
369            }
370        }
371    
372    
373        public Map getExternalXids() {
374            return new HashMap(recovery.getExternalXids());
375        }
376    
377        public void addTransactionAssociationListener(TransactionManagerMonitor listener) {
378            transactionAssociationListeners.addIfAbsent(listener);
379        }
380    
381        public void removeTransactionAssociationListener(TransactionManagerMonitor listener) {
382            transactionAssociationListeners.remove(listener);
383        }
384    
385        protected void fireThreadAssociated(Transaction tx) {
386            for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
387                TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
388                try {
389                    listener.threadAssociated(tx);
390                } catch (Exception e) {
391                    log.warn("Error calling transaction association listener", e);
392                }
393            }
394        }
395    
396        protected void fireThreadUnassociated(Transaction tx) {
397            for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
398                TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
399                try {
400                    listener.threadUnassociated(tx);
401                } catch (Exception e) {
402                    log.warn("Error calling transaction association listener", e);
403                }
404            }
405        }
406    }