View Javadoc

1   /**
2    *
3    *  Licensed to the Apache Software Foundation (ASF) under one or more
4    *  contributor license agreements.  See the NOTICE file distributed with
5    *  this work for additional information regarding copyright ownership.
6    *  The ASF licenses this file to You under the Apache License, Version 2.0
7    *  (the "License"); you may not use this file except in compliance with
8    *  the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing, software
13   *  distributed under the License is distributed on an "AS IS" BASIS,
14   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *  See the License for the specific language governing permissions and
16   *  limitations under the License.
17   */
18  
19  package org.apache.geronimo.transaction.manager;
20  
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.IdentityHashMap;
24  import java.util.Iterator;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  
30  import javax.transaction.HeuristicMixedException;
31  import javax.transaction.HeuristicRollbackException;
32  import javax.transaction.RollbackException;
33  import javax.transaction.Status;
34  import javax.transaction.Synchronization;
35  import javax.transaction.SystemException;
36  import javax.transaction.Transaction;
37  import javax.transaction.xa.XAException;
38  import javax.transaction.xa.XAResource;
39  import javax.transaction.xa.Xid;
40  import javax.ejb.EJBException;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  
45  /**
46   * Basic local transaction with support for multiple resources.
47   *
48   * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
49   */
50  public class TransactionImpl implements Transaction {
51      private static final Log log = LogFactory.getLog("Transaction");
52  
53      private final XidFactory xidFactory;
54      private final Xid xid;
55      private final TransactionLog txnLog;
56      private final long timeout;
57      private final List syncList = new ArrayList(5);
58      private final LinkedList resourceManagers = new LinkedList();
59      private final IdentityHashMap activeXaResources = new IdentityHashMap(3);
60      private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3);
61      private int status = Status.STATUS_NO_TRANSACTION;
62      private Object logMark;
63  
64      private final Map resources = new HashMap();
65      private Synchronization interposedSynchronization;
66      private final Map entityManagers = new HashMap();
67  
68      TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
69          this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds);
70      }
71  
72      TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
73          this.xidFactory = xidFactory;
74          this.txnLog = txnLog;
75          this.xid = xid;
76          this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime();
77          try {
78              txnLog.begin(xid);
79          } catch (LogException e) {
80              status = Status.STATUS_MARKED_ROLLBACK;
81              SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)");
82              ex.initCause(e);
83              throw ex;
84          }
85          status = Status.STATUS_ACTIVE;
86      }
87  
88      //reconstruct a tx for an external tx found in recovery
89      public TransactionImpl(Xid xid, TransactionLog txLog) {
90          this.xidFactory = null;
91          this.txnLog = txLog;
92          this.xid = xid;
93          status = Status.STATUS_PREPARED;
94          //TODO is this a good idea?
95          this.timeout = Long.MAX_VALUE;
96      }
97  
98      public synchronized int getStatus() {
99          return status;
100     }
101 
102     public Object getResource(Object key) {
103         return resources.get(key);
104     }
105 
106     public boolean getRollbackOnly() {
107         return status == Status.STATUS_MARKED_ROLLBACK;
108     }
109 
110     public Object getTransactionKey() {
111         return xid;
112     }
113 
114     public int getTransactionStatus() {
115         return status;
116     }
117 
118     public void putResource(Object key, Object value) {
119         if (key == null) {
120             throw new NullPointerException("You must supply a non-null key for putResource");
121         }
122         resources.put(key, value);
123     }
124 
125     public void registerInterposedSynchronization(Synchronization synchronization) {
126         interposedSynchronization = synchronization;
127     }
128 
129     public synchronized void setRollbackOnly() throws IllegalStateException {
130         switch (status) {
131             case Status.STATUS_ACTIVE:
132             case Status.STATUS_PREPARING:
133                 status = Status.STATUS_MARKED_ROLLBACK;
134                 break;
135             case Status.STATUS_MARKED_ROLLBACK:
136             case Status.STATUS_ROLLING_BACK:
137                 // nothing to do
138                 break;
139             default:
140                 throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status));
141         }
142     }
143 
144     public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
145         if (synch == null) {
146             throw new IllegalArgumentException("Synchronization is null");
147         }
148         switch (status) {
149             case Status.STATUS_ACTIVE:
150             case Status.STATUS_PREPARING:
151                 break;
152             case Status.STATUS_MARKED_ROLLBACK:
153                 throw new RollbackException("Transaction is marked for rollback");
154             default:
155                 throw new IllegalStateException("Status is " + getStateString(status));
156         }
157         syncList.add(synch);
158     }
159 
160     public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
161         if (xaRes == null) {
162             throw new IllegalArgumentException("XAResource is null");
163         }
164         switch (status) {
165             case Status.STATUS_ACTIVE:
166                 break;
167             case Status.STATUS_MARKED_ROLLBACK:
168                 throw new RollbackException("Transaction is marked for rollback");
169             default:
170                 throw new IllegalStateException("Status is " + getStateString(status));
171         }
172 
173         if (activeXaResources.containsKey(xaRes)) {
174             throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!");
175         }
176 
177         try {
178             TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
179             if (manager != null) {
180                 //we know about this one, it was suspended
181                 xaRes.start(manager.getBranchId(), XAResource.TMRESUME);
182                 activeXaResources.put(xaRes, manager);
183                 return true;
184             }
185             //it is not suspended.
186             for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
187                 manager = (TransactionBranch) i.next();
188                 boolean sameRM;
189                 //if the xares is already known, we must be resuming after a suspend.
190                 if (xaRes == manager.getCommitter()) {
191                     throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended");
192                 }
193                 //Otherwise, see if this is a new xares for the same resource manager
194                 try {
195                     sameRM = xaRes.isSameRM(manager.getCommitter());
196                 } catch (XAException e) {
197                     log.warn("Unexpected error checking for same RM", e);
198                     continue;
199                 }
200                 if (sameRM) {
201                     xaRes.start(manager.getBranchId(), XAResource.TMJOIN);
202                     activeXaResources.put(xaRes, manager);
203                     return true;
204                 }
205             }
206             //we know nothing about this XAResource or resource manager
207             Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1);
208             xaRes.start(branchId, XAResource.TMNOFLAGS);
209             activeXaResources.put(xaRes, addBranchXid(xaRes, branchId));
210             return true;
211         } catch (XAException e) {
212             log.warn("Unable to enlist XAResource " + xaRes + ", errorCode: " + e.errorCode, e);
213             return false;
214         }
215     }
216 
217     public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
218         if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) {
219             throw new IllegalStateException("invalid flag for delistResource: " + flag);
220         }
221         if (xaRes == null) {
222             throw new IllegalArgumentException("XAResource is null");
223         }
224         switch (status) {
225             case Status.STATUS_ACTIVE:
226             case Status.STATUS_MARKED_ROLLBACK:
227                 break;
228             default:
229                 throw new IllegalStateException("Status is " + getStateString(status));
230         }
231         TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes);
232         if (manager == null) {
233             if (flag == XAResource.TMSUSPEND) {
234                 throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes);
235             }
236             //not active, and we are not trying to suspend.  We must be ending tx.
237             manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
238             if (manager == null) {
239                 throw new IllegalStateException("Resource not known to transaction: " + xaRes);
240             }
241         }
242 
243         try {
244             xaRes.end(manager.getBranchId(), flag);
245             if (flag == XAResource.TMSUSPEND) {
246                 suspendedXaResources.put(xaRes, manager);
247             }
248             return true;
249         } catch (XAException e) {
250             log.warn("Unable to delist XAResource " + xaRes + ", error code: " + e.errorCode, e);
251             return false;
252         }
253     }
254 
255     //Transaction method, does 2pc
256     public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
257         beforePrepare();
258 
259         try {
260             boolean timedout = false;
261             if (TransactionTimer.getCurrentTime() > timeout) {
262                 status = Status.STATUS_MARKED_ROLLBACK;
263                 timedout = true;
264             }
265 
266             if (status == Status.STATUS_MARKED_ROLLBACK) {
267                 rollbackResources(resourceManagers);
268                 if (timedout) {
269                     throw new RollbackException("Transaction timout");
270                 } else {
271                     throw new RollbackException("Unable to commit: transaction marked for rollback");
272                 }
273             }
274             synchronized (this) {
275                 if (status == Status.STATUS_ACTIVE) {
276                     if (this.resourceManagers.size() == 0) {
277                         // nothing to commit
278                         status = Status.STATUS_COMMITTED;
279                     } else if (this.resourceManagers.size() == 1) {
280                         // one-phase commit decision
281                         status = Status.STATUS_COMMITTING;
282                     } else {
283                         // start prepare part of two-phase
284                         status = Status.STATUS_PREPARING;
285                     }
286                 }
287                 // resourceManagers is now immutable
288             }
289 
290             // no-phase
291             if (resourceManagers.size() == 0) {
292                 synchronized (this) {
293                     status = Status.STATUS_COMMITTED;
294                 }
295                 return;
296             }
297 
298             // one-phase
299             if (resourceManagers.size() == 1) {
300                 TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst();
301                 try {
302                     manager.getCommitter().commit(manager.getBranchId(), true);
303                     synchronized (this) {
304                         status = Status.STATUS_COMMITTED;
305                     }
306                     return;
307                 } catch (XAException e) {
308                     synchronized (this) {
309                         status = Status.STATUS_ROLLEDBACK;
310                     }
311                     throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e);
312                 }
313             }
314 
315             // two-phase
316             boolean willCommit = internalPrepare();
317 
318             // notify the RMs
319             if (willCommit) {
320                 commitResources(resourceManagers);
321             } else {
322                 rollbackResources(resourceManagers);
323                 throw new RollbackException("Unable to commit");
324             }
325         } finally {
326             afterCompletion();
327             synchronized (this) {
328                 status = Status.STATUS_NO_TRANSACTION;
329             }
330         }
331     }
332 
333     //Used from XATerminator for first phase in a remotely controlled tx.
334     int prepare() throws SystemException, RollbackException {
335         beforePrepare();
336         int result = XAResource.XA_RDONLY;
337         try {
338             LinkedList rms;
339             synchronized (this) {
340                 if (status == Status.STATUS_ACTIVE) {
341                     if (resourceManagers.size() == 0) {
342                         // nothing to commit
343                         status = Status.STATUS_COMMITTED;
344                         return result;
345                     } else {
346                         // start prepare part of two-phase
347                         status = Status.STATUS_PREPARING;
348                     }
349                 }
350                 // resourceManagers is now immutable
351                 rms = resourceManagers;
352             }
353 
354             boolean willCommit = internalPrepare();
355 
356             // notify the RMs
357             if (willCommit) {
358                 if (!rms.isEmpty()) {
359                     result = XAResource.XA_OK;
360                 }
361             } else {
362                 rollbackResources(rms);
363                 throw new RollbackException("Unable to commit");
364             }
365         } finally {
366             if (result == XAResource.XA_RDONLY) {
367                 afterCompletion();
368                 synchronized (this) {
369                     status = Status.STATUS_NO_TRANSACTION;
370                 }
371             }
372         }
373         return result;
374     }
375 
376     //used from XATerminator for commit phase of non-readonly remotely controlled tx.
377     void preparedCommit() throws SystemException {
378         try {
379             commitResources(resourceManagers);
380         } finally {
381             afterCompletion();
382             synchronized (this) {
383                 status = Status.STATUS_NO_TRANSACTION;
384             }
385         }
386     }
387 
388     //helper method used by Transaction.commit and XATerminator prepare.
389     private void beforePrepare() {
390         synchronized (this) {
391             switch (status) {
392                 case Status.STATUS_ACTIVE:
393                 case Status.STATUS_MARKED_ROLLBACK:
394                     break;
395                 default:
396                     throw new IllegalStateException("Status is " + getStateString(status));
397             }
398         }
399 
400         beforeCompletion();
401         endResources();
402     }
403 
404 
405     //helper method used by Transaction.commit and XATerminator prepare.
406     private boolean internalPrepare() throws SystemException {
407 
408         for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) {
409             synchronized (this) {
410                 if (status != Status.STATUS_PREPARING) {
411                     // we were marked for rollback
412                     break;
413                 }
414             }
415             TransactionBranch manager = (TransactionBranch) rms.next();
416             try {
417                 int vote = manager.getCommitter().prepare(manager.getBranchId());
418                 if (vote == XAResource.XA_RDONLY) {
419                     // we don't need to consider this RM any more
420                     rms.remove();
421                 }
422             } catch (XAException e) {
423                 synchronized (this) {
424                     status = Status.STATUS_MARKED_ROLLBACK;
425                     //TODO document why this is true from the spec.
426                     //XAException during prepare means we can assume resource is rolled back.
427                     rms.remove();
428                     break;
429                 }
430             }
431         }
432 
433         // decision time...
434         boolean willCommit;
435         synchronized (this) {
436             willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
437             if (willCommit) {
438                 status = Status.STATUS_PREPARED;
439             }
440         }
441         // log our decision
442         if (willCommit && !resourceManagers.isEmpty()) {
443             try {
444                 logMark = txnLog.prepare(xid, resourceManagers);
445             } catch (LogException e) {
446                 try {
447                     rollbackResources(resourceManagers);
448                 } catch (Exception se) {
449                     log.error("Unable to rollback after failure to log prepare", se.getCause());
450                 }
451                 throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e);
452             }
453         }
454         return willCommit;
455     }
456 
457     public void rollback() throws IllegalStateException, SystemException {
458         List rms;
459         synchronized (this) {
460             switch (status) {
461                 case Status.STATUS_ACTIVE:
462                     status = Status.STATUS_MARKED_ROLLBACK;
463                     break;
464                 case Status.STATUS_MARKED_ROLLBACK:
465                     break;
466                 default:
467                     throw new IllegalStateException("Status is " + getStateString(status));
468             }
469             rms = resourceManagers;
470         }
471 
472         beforeCompletion();
473         endResources();
474         try {
475             rollbackResources(rms);
476             //only write rollback record if we have already written prepare record.
477             if (logMark != null) {
478                 try {
479                     txnLog.rollback(xid, logMark);
480                 } catch (LogException e) {
481                     try {
482                         rollbackResources(rms);
483                     } catch (Exception se) {
484                         log.error("Unable to rollback after failure to log decision", se.getCause());
485                     }
486                     throw (SystemException) new SystemException("Error logging rollback").initCause(e);
487                 }
488             }
489         } finally {
490             afterCompletion();
491             synchronized (this) {
492                 status = Status.STATUS_NO_TRANSACTION;
493             }
494         }
495     }
496 
497     private void beforeCompletion() {
498         int i = 0;
499         while (true) {
500             Synchronization synch;
501             synchronized (this) {
502                 if (i == syncList.size()) {
503                     if (interposedSynchronization != null) {
504                         synch = interposedSynchronization;
505                         i++;
506                     } else {
507                         return;
508                     }
509                 } else if (i == syncList.size() + 1) {
510                     return;
511                 } else {
512                     synch = (Synchronization) syncList.get(i++);
513                 }
514             }
515             try {
516                 synch.beforeCompletion();
517             } catch (Exception e) {
518                 log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e);
519                 synchronized (this) {
520                     status = Status.STATUS_MARKED_ROLLBACK;
521                 }
522             }
523         }
524     }
525 
526     private void afterCompletion() {
527         // this does not synchronize because nothing can modify our state at this time
528         if (interposedSynchronization != null) {
529             try {
530                 interposedSynchronization.afterCompletion(status);
531             } catch (Exception e) {
532                 log.warn("Unexpected exception from afterCompletion; continuing", e);
533             }
534         }
535         for (Iterator i = syncList.iterator(); i.hasNext();) {
536             Synchronization synch = (Synchronization) i.next();
537             try {
538                 synch.afterCompletion(status);
539             } catch (Exception e) {
540                 log.warn("Unexpected exception from afterCompletion; continuing", e);
541                 continue;
542             }
543         }
544         for (Iterator i = entityManagers.values().iterator(); i.hasNext();) {
545             Closeable entityManager = (Closeable) i.next();
546             entityManager.close();
547         }
548     }
549 
550     private void endResources() {
551         endResources(activeXaResources);
552         endResources(suspendedXaResources);
553     }
554 
555     private void endResources(IdentityHashMap resourceMap) {
556         while (true) {
557             XAResource xaRes;
558             TransactionBranch manager;
559             int flags;
560             synchronized (this) {
561                 Set entrySet = resourceMap.entrySet();
562                 if (entrySet.isEmpty()) {
563                     return;
564                 }
565                 Map.Entry entry = (Map.Entry) entrySet.iterator().next();
566                 xaRes = (XAResource) entry.getKey();
567                 manager = (TransactionBranch) entry.getValue();
568                 flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS;
569                 resourceMap.remove(xaRes);
570             }
571             try {
572                 xaRes.end(manager.getBranchId(), flags);
573             } catch (XAException e) {
574                 log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back. XA error code: " + e.errorCode, e);
575                 synchronized (this) {
576                     status = Status.STATUS_MARKED_ROLLBACK;
577                 }
578             }
579         }
580     }
581 
582     private void rollbackResources(List rms) throws SystemException {
583         SystemException cause = null;
584         synchronized (this) {
585             status = Status.STATUS_ROLLING_BACK;
586         }
587         for (Iterator i = rms.iterator(); i.hasNext();) {
588             TransactionBranch manager = (TransactionBranch) i.next();
589             try {
590                 manager.getCommitter().rollback(manager.getBranchId());
591             } catch (XAException e) {
592                 log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e);
593                 if (cause == null) {
594                     cause = new SystemException(e.errorCode);
595                 }
596                 continue;
597             }
598         }
599         synchronized (this) {
600             status = Status.STATUS_ROLLEDBACK;
601         }
602         if (cause != null) {
603             throw cause;
604         }
605     }
606 
607     private void commitResources(List rms) throws SystemException {
608         SystemException cause = null;
609         synchronized (this) {
610             status = Status.STATUS_COMMITTING;
611         }
612         for (Iterator i = rms.iterator(); i.hasNext();) {
613             TransactionBranch manager = (TransactionBranch) i.next();
614             try {
615                 manager.getCommitter().commit(manager.getBranchId(), false);
616             } catch (XAException e) {
617                 log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e);
618                 if (cause == null) {
619                     cause = new SystemException(e.errorCode);
620                 }
621                 continue;
622             }
623         }
624         //if all resources were read only, we didn't write a prepare record.
625         if (!rms.isEmpty()) {
626             try {
627                 txnLog.commit(xid, logMark);
628             } catch (LogException e) {
629                 log.error("Unexpected exception logging commit completion for xid " + xid, e);
630                 throw (SystemException) new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e);
631             }
632         }
633         synchronized (this) {
634             status = Status.STATUS_COMMITTED;
635         }
636         if (cause != null) {
637             throw cause;
638         }
639     }
640 
641     private static String getStateString(int status) {
642         switch (status) {
643             case Status.STATUS_ACTIVE:
644                 return "STATUS_ACTIVE";
645             case Status.STATUS_PREPARING:
646                 return "STATUS_PREPARING";
647             case Status.STATUS_PREPARED:
648                 return "STATUS_PREPARED";
649             case Status.STATUS_MARKED_ROLLBACK:
650                 return "STATUS_MARKED_ROLLBACK";
651             case Status.STATUS_ROLLING_BACK:
652                 return "STATUS_ROLLING_BACK";
653             case Status.STATUS_COMMITTING:
654                 return "STATUS_COMMITTING";
655             case Status.STATUS_COMMITTED:
656                 return "STATUS_COMMITTED";
657             case Status.STATUS_ROLLEDBACK:
658                 return "STATUS_ROLLEDBACK";
659             case Status.STATUS_NO_TRANSACTION:
660                 return "STATUS_NO_TRANSACTION";
661             case Status.STATUS_UNKNOWN:
662                 return "STATUS_UNKNOWN";
663             default:
664                 throw new AssertionError();
665         }
666     }
667 
668     public boolean equals(Object obj) {
669         if (obj instanceof TransactionImpl) {
670             TransactionImpl other = (TransactionImpl) obj;
671             return xid.equals(other.xid);
672         } else {
673             return false;
674         }
675     }
676 
677     //when used from recovery, do not add manager to active or suspended resource maps.
678     // The xaresources have already been ended with TMSUCCESS.
679     public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) {
680         TransactionBranch manager = new TransactionBranch(xaRes, branchId);
681         resourceManagers.add(manager);
682         return manager;
683     }
684 
685     public Object getEntityManager(String persistenceUnit) {
686         return entityManagers.get(persistenceUnit);
687     }
688 
689     public void setEntityManager(String persistenceUnit, Object entityManager) {
690         Object oldEntityManager = entityManagers.put(persistenceUnit, entityManager);
691         if (oldEntityManager != null) {
692             throw new EJBException("EntityManager " + oldEntityManager + " for persistenceUnit " + persistenceUnit + " already associated with this transaction " + xid);
693         }
694     }
695 
696     private static class TransactionBranch implements TransactionBranchInfo {
697         private final XAResource committer;
698         private final Xid branchId;
699 
700         public TransactionBranch(XAResource xaRes, Xid branchId) {
701             committer = xaRes;
702             this.branchId = branchId;
703         }
704 
705         public XAResource getCommitter() {
706             return committer;
707         }
708 
709         public Xid getBranchId() {
710             return branchId;
711         }
712 
713         public String getResourceName() {
714             if (committer instanceof NamedXAResource) {
715                 return ((NamedXAResource) committer).getName();
716             } else {
717                 throw new IllegalStateException("Cannot log transactions unles XAResources are named! " + committer);
718             }
719         }
720 
721         public Xid getBranchXid() {
722             return branchId;
723         }
724     }
725 
726 
727 }