001    /**
002     *
003     *  Licensed to the Apache Software Foundation (ASF) under one or more
004     *  contributor license agreements.  See the NOTICE file distributed with
005     *  this work for additional information regarding copyright ownership.
006     *  The ASF licenses this file to You under the Apache License, Version 2.0
007     *  (the "License"); you may not use this file except in compliance with
008     *  the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     *  Unless required by applicable law or agreed to in writing, software
013     *  distributed under the License is distributed on an "AS IS" BASIS,
014     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     *  See the License for the specific language governing permissions and
016     *  limitations under the License.
017     */
018    
019    package org.apache.geronimo.transaction.manager;
020    
021    import java.util.ArrayList;
022    import java.util.HashMap;
023    import java.util.IdentityHashMap;
024    import java.util.Iterator;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    
030    import javax.transaction.HeuristicMixedException;
031    import javax.transaction.HeuristicRollbackException;
032    import javax.transaction.RollbackException;
033    import javax.transaction.Status;
034    import javax.transaction.Synchronization;
035    import javax.transaction.SystemException;
036    import javax.transaction.Transaction;
037    import javax.transaction.xa.XAException;
038    import javax.transaction.xa.XAResource;
039    import javax.transaction.xa.Xid;
040    import javax.ejb.EJBException;
041    
042    import org.apache.commons.logging.Log;
043    import org.apache.commons.logging.LogFactory;
044    
045    /**
046     * Basic local transaction with support for multiple resources.
047     *
048     * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
049     */
050    public class TransactionImpl implements Transaction {
051        private static final Log log = LogFactory.getLog("Transaction");
052    
053        private final XidFactory xidFactory;
054        private final Xid xid;
055        private final TransactionLog txnLog;
056        private final long timeout;
057        private final List syncList = new ArrayList(5);
058        private final LinkedList resourceManagers = new LinkedList();
059        private final IdentityHashMap activeXaResources = new IdentityHashMap(3);
060        private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3);
061        private int status = Status.STATUS_NO_TRANSACTION;
062        private Object logMark;
063    
064        private final Map resources = new HashMap();
065        private Synchronization interposedSynchronization;
066        private final Map entityManagers = new HashMap();
067    
068        TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
069            this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds);
070        }
071    
072        TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
073            this.xidFactory = xidFactory;
074            this.txnLog = txnLog;
075            this.xid = xid;
076            this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime();
077            try {
078                txnLog.begin(xid);
079            } catch (LogException e) {
080                status = Status.STATUS_MARKED_ROLLBACK;
081                SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)");
082                ex.initCause(e);
083                throw ex;
084            }
085            status = Status.STATUS_ACTIVE;
086        }
087    
088        //reconstruct a tx for an external tx found in recovery
089        public TransactionImpl(Xid xid, TransactionLog txLog) {
090            this.xidFactory = null;
091            this.txnLog = txLog;
092            this.xid = xid;
093            status = Status.STATUS_PREPARED;
094            //TODO is this a good idea?
095            this.timeout = Long.MAX_VALUE;
096        }
097    
098        public synchronized int getStatus() {
099            return status;
100        }
101    
102        public Object getResource(Object key) {
103            return resources.get(key);
104        }
105    
106        public boolean getRollbackOnly() {
107            return status == Status.STATUS_MARKED_ROLLBACK;
108        }
109    
110        public Object getTransactionKey() {
111            return xid;
112        }
113    
114        public int getTransactionStatus() {
115            return status;
116        }
117    
118        public void putResource(Object key, Object value) {
119            if (key == null) {
120                throw new NullPointerException("You must supply a non-null key for putResource");
121            }
122            resources.put(key, value);
123        }
124    
125        public void registerInterposedSynchronization(Synchronization synchronization) {
126            interposedSynchronization = synchronization;
127        }
128    
129        public synchronized void setRollbackOnly() throws IllegalStateException {
130            switch (status) {
131                case Status.STATUS_ACTIVE:
132                case Status.STATUS_PREPARING:
133                    status = Status.STATUS_MARKED_ROLLBACK;
134                    break;
135                case Status.STATUS_MARKED_ROLLBACK:
136                case Status.STATUS_ROLLING_BACK:
137                    // nothing to do
138                    break;
139                default:
140                    throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status));
141            }
142        }
143    
144        public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
145            if (synch == null) {
146                throw new IllegalArgumentException("Synchronization is null");
147            }
148            switch (status) {
149                case Status.STATUS_ACTIVE:
150                case Status.STATUS_PREPARING:
151                    break;
152                case Status.STATUS_MARKED_ROLLBACK:
153                    throw new RollbackException("Transaction is marked for rollback");
154                default:
155                    throw new IllegalStateException("Status is " + getStateString(status));
156            }
157            syncList.add(synch);
158        }
159    
160        public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
161            if (xaRes == null) {
162                throw new IllegalArgumentException("XAResource is null");
163            }
164            switch (status) {
165                case Status.STATUS_ACTIVE:
166                    break;
167                case Status.STATUS_MARKED_ROLLBACK:
168                    throw new RollbackException("Transaction is marked for rollback");
169                default:
170                    throw new IllegalStateException("Status is " + getStateString(status));
171            }
172    
173            if (activeXaResources.containsKey(xaRes)) {
174                throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!");
175            }
176    
177            try {
178                TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
179                if (manager != null) {
180                    //we know about this one, it was suspended
181                    xaRes.start(manager.getBranchId(), XAResource.TMRESUME);
182                    activeXaResources.put(xaRes, manager);
183                    return true;
184                }
185                //it is not suspended.
186                for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
187                    manager = (TransactionBranch) i.next();
188                    boolean sameRM;
189                    //if the xares is already known, we must be resuming after a suspend.
190                    if (xaRes == manager.getCommitter()) {
191                        throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended");
192                    }
193                    //Otherwise, see if this is a new xares for the same resource manager
194                    try {
195                        sameRM = xaRes.isSameRM(manager.getCommitter());
196                    } catch (XAException e) {
197                        log.warn("Unexpected error checking for same RM", e);
198                        continue;
199                    }
200                    if (sameRM) {
201                        xaRes.start(manager.getBranchId(), XAResource.TMJOIN);
202                        activeXaResources.put(xaRes, manager);
203                        return true;
204                    }
205                }
206                //we know nothing about this XAResource or resource manager
207                Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1);
208                xaRes.start(branchId, XAResource.TMNOFLAGS);
209                activeXaResources.put(xaRes, addBranchXid(xaRes, branchId));
210                return true;
211            } catch (XAException e) {
212                log.warn("Unable to enlist XAResource " + xaRes + ", errorCode: " + e.errorCode, e);
213                return false;
214            }
215        }
216    
217        public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
218            if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) {
219                throw new IllegalStateException("invalid flag for delistResource: " + flag);
220            }
221            if (xaRes == null) {
222                throw new IllegalArgumentException("XAResource is null");
223            }
224            switch (status) {
225                case Status.STATUS_ACTIVE:
226                case Status.STATUS_MARKED_ROLLBACK:
227                    break;
228                default:
229                    throw new IllegalStateException("Status is " + getStateString(status));
230            }
231            TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes);
232            if (manager == null) {
233                if (flag == XAResource.TMSUSPEND) {
234                    throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes);
235                }
236                //not active, and we are not trying to suspend.  We must be ending tx.
237                manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
238                if (manager == null) {
239                    throw new IllegalStateException("Resource not known to transaction: " + xaRes);
240                }
241            }
242    
243            try {
244                xaRes.end(manager.getBranchId(), flag);
245                if (flag == XAResource.TMSUSPEND) {
246                    suspendedXaResources.put(xaRes, manager);
247                }
248                return true;
249            } catch (XAException e) {
250                log.warn("Unable to delist XAResource " + xaRes + ", error code: " + e.errorCode, e);
251                return false;
252            }
253        }
254    
255        //Transaction method, does 2pc
256        public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
257            beforePrepare();
258    
259            try {
260                boolean timedout = false;
261                if (TransactionTimer.getCurrentTime() > timeout) {
262                    status = Status.STATUS_MARKED_ROLLBACK;
263                    timedout = true;
264                }
265    
266                if (status == Status.STATUS_MARKED_ROLLBACK) {
267                    rollbackResources(resourceManagers);
268                    if (timedout) {
269                        throw new RollbackException("Transaction timout");
270                    } else {
271                        throw new RollbackException("Unable to commit: transaction marked for rollback");
272                    }
273                }
274                synchronized (this) {
275                    if (status == Status.STATUS_ACTIVE) {
276                        if (this.resourceManagers.size() == 0) {
277                            // nothing to commit
278                            status = Status.STATUS_COMMITTED;
279                        } else if (this.resourceManagers.size() == 1) {
280                            // one-phase commit decision
281                            status = Status.STATUS_COMMITTING;
282                        } else {
283                            // start prepare part of two-phase
284                            status = Status.STATUS_PREPARING;
285                        }
286                    }
287                    // resourceManagers is now immutable
288                }
289    
290                // no-phase
291                if (resourceManagers.size() == 0) {
292                    synchronized (this) {
293                        status = Status.STATUS_COMMITTED;
294                    }
295                    return;
296                }
297    
298                // one-phase
299                if (resourceManagers.size() == 1) {
300                    TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst();
301                    try {
302                        manager.getCommitter().commit(manager.getBranchId(), true);
303                        synchronized (this) {
304                            status = Status.STATUS_COMMITTED;
305                        }
306                        return;
307                    } catch (XAException e) {
308                        synchronized (this) {
309                            status = Status.STATUS_ROLLEDBACK;
310                        }
311                        throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e);
312                    }
313                }
314    
315                // two-phase
316                boolean willCommit = internalPrepare();
317    
318                // notify the RMs
319                if (willCommit) {
320                    commitResources(resourceManagers);
321                } else {
322                    rollbackResources(resourceManagers);
323                    throw new RollbackException("Unable to commit");
324                }
325            } finally {
326                afterCompletion();
327                synchronized (this) {
328                    status = Status.STATUS_NO_TRANSACTION;
329                }
330            }
331        }
332    
333        //Used from XATerminator for first phase in a remotely controlled tx.
334        int prepare() throws SystemException, RollbackException {
335            beforePrepare();
336            int result = XAResource.XA_RDONLY;
337            try {
338                LinkedList rms;
339                synchronized (this) {
340                    if (status == Status.STATUS_ACTIVE) {
341                        if (resourceManagers.size() == 0) {
342                            // nothing to commit
343                            status = Status.STATUS_COMMITTED;
344                            return result;
345                        } else {
346                            // start prepare part of two-phase
347                            status = Status.STATUS_PREPARING;
348                        }
349                    }
350                    // resourceManagers is now immutable
351                    rms = resourceManagers;
352                }
353    
354                boolean willCommit = internalPrepare();
355    
356                // notify the RMs
357                if (willCommit) {
358                    if (!rms.isEmpty()) {
359                        result = XAResource.XA_OK;
360                    }
361                } else {
362                    rollbackResources(rms);
363                    throw new RollbackException("Unable to commit");
364                }
365            } finally {
366                if (result == XAResource.XA_RDONLY) {
367                    afterCompletion();
368                    synchronized (this) {
369                        status = Status.STATUS_NO_TRANSACTION;
370                    }
371                }
372            }
373            return result;
374        }
375    
376        //used from XATerminator for commit phase of non-readonly remotely controlled tx.
377        void preparedCommit() throws SystemException {
378            try {
379                commitResources(resourceManagers);
380            } finally {
381                afterCompletion();
382                synchronized (this) {
383                    status = Status.STATUS_NO_TRANSACTION;
384                }
385            }
386        }
387    
388        //helper method used by Transaction.commit and XATerminator prepare.
389        private void beforePrepare() {
390            synchronized (this) {
391                switch (status) {
392                    case Status.STATUS_ACTIVE:
393                    case Status.STATUS_MARKED_ROLLBACK:
394                        break;
395                    default:
396                        throw new IllegalStateException("Status is " + getStateString(status));
397                }
398            }
399    
400            beforeCompletion();
401            endResources();
402        }
403    
404    
405        //helper method used by Transaction.commit and XATerminator prepare.
406        private boolean internalPrepare() throws SystemException {
407    
408            for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) {
409                synchronized (this) {
410                    if (status != Status.STATUS_PREPARING) {
411                        // we were marked for rollback
412                        break;
413                    }
414                }
415                TransactionBranch manager = (TransactionBranch) rms.next();
416                try {
417                    int vote = manager.getCommitter().prepare(manager.getBranchId());
418                    if (vote == XAResource.XA_RDONLY) {
419                        // we don't need to consider this RM any more
420                        rms.remove();
421                    }
422                } catch (XAException e) {
423                    synchronized (this) {
424                        status = Status.STATUS_MARKED_ROLLBACK;
425                        //TODO document why this is true from the spec.
426                        //XAException during prepare means we can assume resource is rolled back.
427                        rms.remove();
428                        break;
429                    }
430                }
431            }
432    
433            // decision time...
434            boolean willCommit;
435            synchronized (this) {
436                willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
437                if (willCommit) {
438                    status = Status.STATUS_PREPARED;
439                }
440            }
441            // log our decision
442            if (willCommit && !resourceManagers.isEmpty()) {
443                try {
444                    logMark = txnLog.prepare(xid, resourceManagers);
445                } catch (LogException e) {
446                    try {
447                        rollbackResources(resourceManagers);
448                    } catch (Exception se) {
449                        log.error("Unable to rollback after failure to log prepare", se.getCause());
450                    }
451                    throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e);
452                }
453            }
454            return willCommit;
455        }
456    
457        public void rollback() throws IllegalStateException, SystemException {
458            List rms;
459            synchronized (this) {
460                switch (status) {
461                    case Status.STATUS_ACTIVE:
462                        status = Status.STATUS_MARKED_ROLLBACK;
463                        break;
464                    case Status.STATUS_MARKED_ROLLBACK:
465                        break;
466                    default:
467                        throw new IllegalStateException("Status is " + getStateString(status));
468                }
469                rms = resourceManagers;
470            }
471    
472            beforeCompletion();
473            endResources();
474            try {
475                rollbackResources(rms);
476                //only write rollback record if we have already written prepare record.
477                if (logMark != null) {
478                    try {
479                        txnLog.rollback(xid, logMark);
480                    } catch (LogException e) {
481                        try {
482                            rollbackResources(rms);
483                        } catch (Exception se) {
484                            log.error("Unable to rollback after failure to log decision", se.getCause());
485                        }
486                        throw (SystemException) new SystemException("Error logging rollback").initCause(e);
487                    }
488                }
489            } finally {
490                afterCompletion();
491                synchronized (this) {
492                    status = Status.STATUS_NO_TRANSACTION;
493                }
494            }
495        }
496    
497        private void beforeCompletion() {
498            int i = 0;
499            while (true) {
500                Synchronization synch;
501                synchronized (this) {
502                    if (i == syncList.size()) {
503                        if (interposedSynchronization != null) {
504                            synch = interposedSynchronization;
505                            i++;
506                        } else {
507                            return;
508                        }
509                    } else if (i == syncList.size() + 1) {
510                        return;
511                    } else {
512                        synch = (Synchronization) syncList.get(i++);
513                    }
514                }
515                try {
516                    synch.beforeCompletion();
517                } catch (Exception e) {
518                    log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e);
519                    synchronized (this) {
520                        status = Status.STATUS_MARKED_ROLLBACK;
521                    }
522                }
523            }
524        }
525    
526        private void afterCompletion() {
527            // this does not synchronize because nothing can modify our state at this time
528            if (interposedSynchronization != null) {
529                try {
530                    interposedSynchronization.afterCompletion(status);
531                } catch (Exception e) {
532                    log.warn("Unexpected exception from afterCompletion; continuing", e);
533                }
534            }
535            for (Iterator i = syncList.iterator(); i.hasNext();) {
536                Synchronization synch = (Synchronization) i.next();
537                try {
538                    synch.afterCompletion(status);
539                } catch (Exception e) {
540                    log.warn("Unexpected exception from afterCompletion; continuing", e);
541                    continue;
542                }
543            }
544            for (Iterator i = entityManagers.values().iterator(); i.hasNext();) {
545                Closeable entityManager = (Closeable) i.next();
546                entityManager.close();
547            }
548        }
549    
550        private void endResources() {
551            endResources(activeXaResources);
552            endResources(suspendedXaResources);
553        }
554    
555        private void endResources(IdentityHashMap resourceMap) {
556            while (true) {
557                XAResource xaRes;
558                TransactionBranch manager;
559                int flags;
560                synchronized (this) {
561                    Set entrySet = resourceMap.entrySet();
562                    if (entrySet.isEmpty()) {
563                        return;
564                    }
565                    Map.Entry entry = (Map.Entry) entrySet.iterator().next();
566                    xaRes = (XAResource) entry.getKey();
567                    manager = (TransactionBranch) entry.getValue();
568                    flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS;
569                    resourceMap.remove(xaRes);
570                }
571                try {
572                    xaRes.end(manager.getBranchId(), flags);
573                } catch (XAException e) {
574                    log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back. XA error code: " + e.errorCode, e);
575                    synchronized (this) {
576                        status = Status.STATUS_MARKED_ROLLBACK;
577                    }
578                }
579            }
580        }
581    
582        private void rollbackResources(List rms) throws SystemException {
583            SystemException cause = null;
584            synchronized (this) {
585                status = Status.STATUS_ROLLING_BACK;
586            }
587            for (Iterator i = rms.iterator(); i.hasNext();) {
588                TransactionBranch manager = (TransactionBranch) i.next();
589                try {
590                    manager.getCommitter().rollback(manager.getBranchId());
591                } catch (XAException e) {
592                    log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e);
593                    if (cause == null) {
594                        cause = new SystemException(e.errorCode);
595                    }
596                    continue;
597                }
598            }
599            synchronized (this) {
600                status = Status.STATUS_ROLLEDBACK;
601            }
602            if (cause != null) {
603                throw cause;
604            }
605        }
606    
607        private void commitResources(List rms) throws SystemException {
608            SystemException cause = null;
609            synchronized (this) {
610                status = Status.STATUS_COMMITTING;
611            }
612            for (Iterator i = rms.iterator(); i.hasNext();) {
613                TransactionBranch manager = (TransactionBranch) i.next();
614                try {
615                    manager.getCommitter().commit(manager.getBranchId(), false);
616                } catch (XAException e) {
617                    log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e);
618                    if (cause == null) {
619                        cause = new SystemException(e.errorCode);
620                    }
621                    continue;
622                }
623            }
624            //if all resources were read only, we didn't write a prepare record.
625            if (!rms.isEmpty()) {
626                try {
627                    txnLog.commit(xid, logMark);
628                } catch (LogException e) {
629                    log.error("Unexpected exception logging commit completion for xid " + xid, e);
630                    throw (SystemException) new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e);
631                }
632            }
633            synchronized (this) {
634                status = Status.STATUS_COMMITTED;
635            }
636            if (cause != null) {
637                throw cause;
638            }
639        }
640    
641        private static String getStateString(int status) {
642            switch (status) {
643                case Status.STATUS_ACTIVE:
644                    return "STATUS_ACTIVE";
645                case Status.STATUS_PREPARING:
646                    return "STATUS_PREPARING";
647                case Status.STATUS_PREPARED:
648                    return "STATUS_PREPARED";
649                case Status.STATUS_MARKED_ROLLBACK:
650                    return "STATUS_MARKED_ROLLBACK";
651                case Status.STATUS_ROLLING_BACK:
652                    return "STATUS_ROLLING_BACK";
653                case Status.STATUS_COMMITTING:
654                    return "STATUS_COMMITTING";
655                case Status.STATUS_COMMITTED:
656                    return "STATUS_COMMITTED";
657                case Status.STATUS_ROLLEDBACK:
658                    return "STATUS_ROLLEDBACK";
659                case Status.STATUS_NO_TRANSACTION:
660                    return "STATUS_NO_TRANSACTION";
661                case Status.STATUS_UNKNOWN:
662                    return "STATUS_UNKNOWN";
663                default:
664                    throw new AssertionError();
665            }
666        }
667    
668        public boolean equals(Object obj) {
669            if (obj instanceof TransactionImpl) {
670                TransactionImpl other = (TransactionImpl) obj;
671                return xid.equals(other.xid);
672            } else {
673                return false;
674            }
675        }
676    
677        //when used from recovery, do not add manager to active or suspended resource maps.
678        // The xaresources have already been ended with TMSUCCESS.
679        public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) {
680            TransactionBranch manager = new TransactionBranch(xaRes, branchId);
681            resourceManagers.add(manager);
682            return manager;
683        }
684    
685        public Object getEntityManager(String persistenceUnit) {
686            return entityManagers.get(persistenceUnit);
687        }
688    
689        public void setEntityManager(String persistenceUnit, Object entityManager) {
690            Object oldEntityManager = entityManagers.put(persistenceUnit, entityManager);
691            if (oldEntityManager != null) {
692                throw new EJBException("EntityManager " + oldEntityManager + " for persistenceUnit " + persistenceUnit + " already associated with this transaction " + xid);
693            }
694        }
695    
696        private static class TransactionBranch implements TransactionBranchInfo {
697            private final XAResource committer;
698            private final Xid branchId;
699    
700            public TransactionBranch(XAResource xaRes, Xid branchId) {
701                committer = xaRes;
702                this.branchId = branchId;
703            }
704    
705            public XAResource getCommitter() {
706                return committer;
707            }
708    
709            public Xid getBranchId() {
710                return branchId;
711            }
712    
713            public String getResourceName() {
714                if (committer instanceof NamedXAResource) {
715                    return ((NamedXAResource) committer).getName();
716                } else {
717                    throw new IllegalStateException("Cannot log transactions unles XAResources are named! " + committer);
718                }
719            }
720    
721            public Xid getBranchXid() {
722                return branchId;
723            }
724        }
725    
726    
727    }