001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.transaction.manager;
019    
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    
026    import javax.transaction.*;
027    import javax.transaction.xa.XAException;
028    import javax.transaction.xa.Xid;
029    
030    import java.util.concurrent.ConcurrentHashMap;
031    import java.util.concurrent.CopyOnWriteArrayList;
032    import java.util.concurrent.atomic.AtomicLong;
033    import org.apache.geronimo.transaction.log.UnrecoverableLog;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Simple implementation of a transaction manager.
039     *
040     * @version $Rev: 727065 $ $Date: 2008-12-16 10:22:22 -0500 (Tue, 16 Dec 2008) $
041     */
042    public class TransactionManagerImpl implements TransactionManager, UserTransaction, TransactionSynchronizationRegistry, XidImporter, MonitorableTransactionManager, RecoverableTransactionManager {
043        private static final Logger log = LoggerFactory.getLogger(TransactionManagerImpl.class);
044        protected static final int DEFAULT_TIMEOUT = 600;
045        protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68};
046    
047        final TransactionLog transactionLog;
048        final XidFactory xidFactory;
049        private final int defaultTransactionTimeoutMilliseconds;
050        private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal();
051        private final ThreadLocal threadTx = new ThreadLocal();
052        private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap();
053        private static final Logger recoveryLog = LoggerFactory.getLogger("RecoveryController");
054        final Recovery recovery;
055        private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList();
056        private List recoveryErrors = new ArrayList();
057        // statistics
058        private AtomicLong totalCommits = new AtomicLong(0);
059        private AtomicLong totalRollBacks = new AtomicLong(0);
060        private AtomicLong activeCount = new AtomicLong(0);
061    
062        public TransactionManagerImpl() throws XAException {
063            this(DEFAULT_TIMEOUT,
064                    null,
065                    null
066            );
067        }
068    
069        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException {
070            this(defaultTransactionTimeoutSeconds,
071                    null,
072                    null
073            );
074        }
075    
076        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
077            this(defaultTransactionTimeoutSeconds,
078                    null,
079                    transactionLog
080            );
081        }
082    
083        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog) throws XAException {
084            if (defaultTransactionTimeoutSeconds <= 0) {
085                throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds);
086            }
087            this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000;
088    
089            if (transactionLog == null) {
090                this.transactionLog = new UnrecoverableLog();
091            } else {
092                this.transactionLog = transactionLog;
093            }
094    
095            if (xidFactory != null) {
096                this.xidFactory = xidFactory;
097            } else {
098                this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID);
099            }
100    
101            recovery = new RecoveryImpl(this.transactionLog, this.xidFactory);
102            recovery.recoverLog();
103        }
104    
105        public Transaction getTransaction() {
106            return (Transaction) threadTx.get();
107        }
108    
109        private void associate(TransactionImpl tx) throws InvalidTransactionException {
110            if (tx.getStatus() == Status.STATUS_NO_TRANSACTION) {
111                throw new InvalidTransactionException("Cannot resume invalid transaction: " + tx);
112            } else {
113                Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread());
114                if (existingAssociation != null) {
115                    throw new InvalidTransactionException("Specified transaction is already associated with another thread");
116                }
117                threadTx.set(tx);
118                fireThreadAssociated(tx);
119                activeCount.getAndIncrement();
120            }
121        } 
122    
123        private void unassociate() {
124            Transaction tx = getTransaction();
125            if (tx != null) {
126                associatedTransactions.remove(tx);
127                threadTx.set(null);
128                fireThreadUnassociated(tx);
129                activeCount.getAndDecrement();
130            }
131        }
132    
133        public void setTransactionTimeout(int seconds) throws SystemException {
134            if (seconds < 0) {
135                throw new SystemException("transaction timeout must be positive or 0 to reset to default");
136            }
137            if (seconds == 0) {
138                transactionTimeoutMilliseconds.set(null);
139            } else {
140                transactionTimeoutMilliseconds.set(new Long(seconds * 1000));
141            }
142        }
143    
144        public int getStatus() throws SystemException {
145            Transaction tx = getTransaction();
146            return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION;
147        }
148    
149        public void begin() throws NotSupportedException, SystemException {
150            begin(getTransactionTimeoutMilliseconds(0L));
151        }
152    
153        public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException {
154            if (getStatus() != Status.STATUS_NO_TRANSACTION) {
155                throw new NotSupportedException("Nested Transactions are not supported");
156            }
157            TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
158    //        timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
159            try {
160                associate(tx);
161            } catch (InvalidTransactionException e) {
162                // should not be possible since we just created that transaction and no one has a reference yet
163                throw (SystemException)new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction").initCause(e);
164            }
165            // Todo: Verify if this is correct thing to do. Use default timeout for next transaction.
166            this.transactionTimeoutMilliseconds.set(null);
167            return tx;
168        }
169    
170        public Transaction suspend() throws SystemException {
171            Transaction tx = getTransaction();
172            if (tx != null) {
173                unassociate();
174            }
175            return tx;
176        }
177    
178        public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException {
179            if (getTransaction() != null && tx != getTransaction()) {
180                throw new IllegalStateException("Thread already associated with another transaction");
181            }
182            if (tx != null && tx != getTransaction()) {
183                if (!(tx instanceof TransactionImpl)) {
184                    throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx);
185                }
186                
187                associate((TransactionImpl) tx);
188            }
189        }
190    
191        public Object getResource(Object key) {
192            TransactionImpl tx = getActiveTransactionImpl();
193            return tx.getResource(key);
194        }
195    
196        private TransactionImpl getActiveTransactionImpl() {
197            TransactionImpl tx = (TransactionImpl)threadTx.get();
198            if (tx == null) {
199                throw new IllegalStateException("No tx on thread");
200            }
201            if (tx.getStatus() != Status.STATUS_ACTIVE && tx.getStatus() != Status.STATUS_MARKED_ROLLBACK) {
202                throw new IllegalStateException("Transaction " + tx + " is not active");
203            }
204            return tx;
205        }
206    
207        public boolean getRollbackOnly() {
208            TransactionImpl tx = getActiveTransactionImpl();
209            return tx.getRollbackOnly();
210        }
211    
212        public Object getTransactionKey() {
213            TransactionImpl tx = (TransactionImpl) getTransaction();
214            return tx == null ? null: tx.getTransactionKey();
215        }
216    
217        public int getTransactionStatus() {
218            TransactionImpl tx = (TransactionImpl) getTransaction();
219            return tx == null? Status.STATUS_NO_TRANSACTION: tx.getTransactionStatus();
220        }
221    
222        public void putResource(Object key, Object value) {
223            TransactionImpl tx = getActiveTransactionImpl();
224            tx.putResource(key, value);
225        }
226    
227        /**
228         * jta 1.1 method so the jpa implementations can be told to flush their caches.
229         * @param synchronization
230         */
231        public void registerInterposedSynchronization(Synchronization synchronization) {
232            TransactionImpl tx = getActiveTransactionImpl();
233            tx.registerInterposedSynchronization(synchronization);
234        }
235    
236        public void setRollbackOnly() throws IllegalStateException {
237            TransactionImpl tx = (TransactionImpl) threadTx.get();
238            if (tx == null) {
239                throw new IllegalStateException("No transaction associated with current thread");
240            }
241            tx.setRollbackOnly();
242        }
243    
244        public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
245            Transaction tx = getTransaction();
246            if (tx == null) {
247                throw new IllegalStateException("No transaction associated with current thread");
248            }
249            try {
250                tx.commit();
251            } finally {
252                unassociate();
253            }
254            totalCommits.getAndIncrement();
255        }
256    
257        public void rollback() throws IllegalStateException, SecurityException, SystemException {
258            Transaction tx = getTransaction();
259            if (tx == null) {
260                throw new IllegalStateException("No transaction associated with current thread");
261            }
262            try {
263                tx.rollback();
264            } finally {
265                unassociate();
266            }
267            totalRollBacks.getAndIncrement();
268        }
269    
270        //XidImporter implementation
271        public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException {
272            if (transactionTimeoutMilliseconds < 0) {
273                throw new SystemException("transaction timeout must be positive or 0 to reset to default");
274            }
275            TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
276            return tx;
277        }
278    
279        public void commit(Transaction tx, boolean onePhase) throws XAException {
280            if (onePhase) {
281                try {
282                    tx.commit();
283                } catch (HeuristicMixedException e) {
284                    throw (XAException) new XAException().initCause(e);
285                } catch (HeuristicRollbackException e) {
286                    throw (XAException) new XAException().initCause(e);
287                } catch (RollbackException e) {
288                    throw (XAException) new XAException().initCause(e);
289                } catch (SecurityException e) {
290                    throw (XAException) new XAException().initCause(e);
291                } catch (SystemException e) {
292                    throw (XAException) new XAException().initCause(e);
293                }
294            } else {
295                try {
296                    ((TransactionImpl) tx).preparedCommit();
297                } catch (HeuristicMixedException e) {
298                    throw (XAException) new XAException().initCause(e);
299                } catch (HeuristicRollbackException e) {
300                    throw (XAException) new XAException().initCause(e);
301                } catch (SystemException e) {
302                    throw (XAException) new XAException().initCause(e);
303                }
304            }
305            totalCommits.getAndIncrement();
306        }
307    
308        public void forget(Transaction tx) throws XAException {
309            //TODO implement this!
310        }
311    
312        public int prepare(Transaction tx) throws XAException {
313            try {
314                return ((TransactionImpl) tx).prepare();
315            } catch (SystemException e) {
316                throw (XAException) new XAException().initCause(e);
317            } catch (RollbackException e) {
318                throw (XAException) new XAException().initCause(e);
319            }
320        }
321    
322        public void rollback(Transaction tx) throws XAException {
323            try {
324                tx.rollback();
325            } catch (IllegalStateException e) {
326                throw (XAException) new XAException().initCause(e);
327            } catch (SystemException e) {
328                throw (XAException) new XAException().initCause(e);
329            }
330            totalRollBacks.getAndIncrement();
331        }
332    
333        long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) {
334            if (transactionTimeoutMilliseconds != 0) {
335                return transactionTimeoutMilliseconds;
336            }
337            Long timeout = (Long) this.transactionTimeoutMilliseconds.get();
338            if (timeout != null) {
339                return timeout.longValue();
340            }
341            return defaultTransactionTimeoutMilliseconds;
342        }
343    
344        //Recovery
345        public void recoveryError(Exception e) {
346            recoveryLog.error("Recovery error", e);
347            recoveryErrors.add(e);
348        }
349    
350        public void recoverResourceManager(NamedXAResource xaResource) {
351            try {
352                recovery.recoverResourceManager(xaResource);
353            } catch (XAException e) {
354                recoveryError(e);
355            }
356        }
357    
358        public Map getExternalXids() {
359            return new HashMap(recovery.getExternalXids());
360        }
361    
362        public void addTransactionAssociationListener(TransactionManagerMonitor listener) {
363            transactionAssociationListeners.addIfAbsent(listener);
364        }
365    
366        public void removeTransactionAssociationListener(TransactionManagerMonitor listener) {
367            transactionAssociationListeners.remove(listener);
368        }
369    
370        protected void fireThreadAssociated(Transaction tx) {
371            for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
372                TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
373                try {
374                    listener.threadAssociated(tx);
375                } catch (Exception e) {
376                    log.warn("Error calling transaction association listener", e);
377                }
378            }
379        }
380    
381        protected void fireThreadUnassociated(Transaction tx) {
382            for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
383                TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
384                try {
385                    listener.threadUnassociated(tx);
386                } catch (Exception e) {
387                    log.warn("Error calling transaction association listener", e);
388                }
389            }
390        }
391    
392        /**
393         * Returns the number of active transactions.
394         */
395        public long getActiveCount() {
396            return activeCount.longValue();
397        }
398    
399        /**
400         * Return the number of total commits
401         */
402        public long getTotalCommits() {
403            return totalCommits.longValue();
404        }
405    
406        /**
407         * Returns the number of total rollbacks
408         */
409        public long getTotalRollbacks() {
410            return totalRollBacks.longValue();
411        }
412    
413        /**
414         * Reset statistics
415         */
416        public void resetStatistics() {
417            totalCommits.getAndSet(0);
418            totalRollBacks.getAndSet(0);
419        }
420    }