001 /**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements. See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.geronimo.transaction.manager;
020
021 import java.util.ArrayList;
022 import java.util.HashMap;
023 import java.util.IdentityHashMap;
024 import java.util.Iterator;
025 import java.util.LinkedList;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Set;
029
030 import javax.transaction.HeuristicMixedException;
031 import javax.transaction.HeuristicRollbackException;
032 import javax.transaction.RollbackException;
033 import javax.transaction.Status;
034 import javax.transaction.Synchronization;
035 import javax.transaction.SystemException;
036 import javax.transaction.Transaction;
037 import javax.transaction.xa.XAException;
038 import javax.transaction.xa.XAResource;
039 import javax.transaction.xa.Xid;
040 import javax.ejb.EJBException;
041
042 import org.apache.commons.logging.Log;
043 import org.apache.commons.logging.LogFactory;
044
045 /**
046 * Basic local transaction with support for multiple resources.
047 *
048 * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
049 */
050 public class TransactionImpl implements Transaction {
051 private static final Log log = LogFactory.getLog("Transaction");
052
053 private final XidFactory xidFactory;
054 private final Xid xid;
055 private final TransactionLog txnLog;
056 private final long timeout;
057 private final List syncList = new ArrayList(5);
058 private final LinkedList resourceManagers = new LinkedList();
059 private final IdentityHashMap activeXaResources = new IdentityHashMap(3);
060 private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3);
061 private int status = Status.STATUS_NO_TRANSACTION;
062 private Object logMark;
063
064 private final Map resources = new HashMap();
065 private Synchronization interposedSynchronization;
066 private final Map entityManagers = new HashMap();
067
068 TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
069 this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds);
070 }
071
072 TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
073 this.xidFactory = xidFactory;
074 this.txnLog = txnLog;
075 this.xid = xid;
076 this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime();
077 try {
078 txnLog.begin(xid);
079 } catch (LogException e) {
080 status = Status.STATUS_MARKED_ROLLBACK;
081 SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)");
082 ex.initCause(e);
083 throw ex;
084 }
085 status = Status.STATUS_ACTIVE;
086 }
087
088 //reconstruct a tx for an external tx found in recovery
089 public TransactionImpl(Xid xid, TransactionLog txLog) {
090 this.xidFactory = null;
091 this.txnLog = txLog;
092 this.xid = xid;
093 status = Status.STATUS_PREPARED;
094 //TODO is this a good idea?
095 this.timeout = Long.MAX_VALUE;
096 }
097
098 public synchronized int getStatus() {
099 return status;
100 }
101
102 public Object getResource(Object key) {
103 return resources.get(key);
104 }
105
106 public boolean getRollbackOnly() {
107 return status == Status.STATUS_MARKED_ROLLBACK;
108 }
109
110 public Object getTransactionKey() {
111 return xid;
112 }
113
114 public int getTransactionStatus() {
115 return status;
116 }
117
118 public void putResource(Object key, Object value) {
119 if (key == null) {
120 throw new NullPointerException("You must supply a non-null key for putResource");
121 }
122 resources.put(key, value);
123 }
124
125 public void registerInterposedSynchronization(Synchronization synchronization) {
126 interposedSynchronization = synchronization;
127 }
128
129 public synchronized void setRollbackOnly() throws IllegalStateException {
130 switch (status) {
131 case Status.STATUS_ACTIVE:
132 case Status.STATUS_PREPARING:
133 status = Status.STATUS_MARKED_ROLLBACK;
134 break;
135 case Status.STATUS_MARKED_ROLLBACK:
136 case Status.STATUS_ROLLING_BACK:
137 // nothing to do
138 break;
139 default:
140 throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status));
141 }
142 }
143
144 public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
145 if (synch == null) {
146 throw new IllegalArgumentException("Synchronization is null");
147 }
148 switch (status) {
149 case Status.STATUS_ACTIVE:
150 case Status.STATUS_PREPARING:
151 break;
152 case Status.STATUS_MARKED_ROLLBACK:
153 throw new RollbackException("Transaction is marked for rollback");
154 default:
155 throw new IllegalStateException("Status is " + getStateString(status));
156 }
157 syncList.add(synch);
158 }
159
160 public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
161 if (xaRes == null) {
162 throw new IllegalArgumentException("XAResource is null");
163 }
164 switch (status) {
165 case Status.STATUS_ACTIVE:
166 break;
167 case Status.STATUS_MARKED_ROLLBACK:
168 throw new RollbackException("Transaction is marked for rollback");
169 default:
170 throw new IllegalStateException("Status is " + getStateString(status));
171 }
172
173 if (activeXaResources.containsKey(xaRes)) {
174 throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!");
175 }
176
177 try {
178 TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
179 if (manager != null) {
180 //we know about this one, it was suspended
181 xaRes.start(manager.getBranchId(), XAResource.TMRESUME);
182 activeXaResources.put(xaRes, manager);
183 return true;
184 }
185 //it is not suspended.
186 for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
187 manager = (TransactionBranch) i.next();
188 boolean sameRM;
189 //if the xares is already known, we must be resuming after a suspend.
190 if (xaRes == manager.getCommitter()) {
191 throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended");
192 }
193 //Otherwise, see if this is a new xares for the same resource manager
194 try {
195 sameRM = xaRes.isSameRM(manager.getCommitter());
196 } catch (XAException e) {
197 log.warn("Unexpected error checking for same RM", e);
198 continue;
199 }
200 if (sameRM) {
201 xaRes.start(manager.getBranchId(), XAResource.TMJOIN);
202 activeXaResources.put(xaRes, manager);
203 return true;
204 }
205 }
206 //we know nothing about this XAResource or resource manager
207 Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1);
208 xaRes.start(branchId, XAResource.TMNOFLAGS);
209 activeXaResources.put(xaRes, addBranchXid(xaRes, branchId));
210 return true;
211 } catch (XAException e) {
212 log.warn("Unable to enlist XAResource " + xaRes + ", errorCode: " + e.errorCode, e);
213 return false;
214 }
215 }
216
217 public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
218 if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) {
219 throw new IllegalStateException("invalid flag for delistResource: " + flag);
220 }
221 if (xaRes == null) {
222 throw new IllegalArgumentException("XAResource is null");
223 }
224 switch (status) {
225 case Status.STATUS_ACTIVE:
226 case Status.STATUS_MARKED_ROLLBACK:
227 break;
228 default:
229 throw new IllegalStateException("Status is " + getStateString(status));
230 }
231 TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes);
232 if (manager == null) {
233 if (flag == XAResource.TMSUSPEND) {
234 throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes);
235 }
236 //not active, and we are not trying to suspend. We must be ending tx.
237 manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
238 if (manager == null) {
239 throw new IllegalStateException("Resource not known to transaction: " + xaRes);
240 }
241 }
242
243 try {
244 xaRes.end(manager.getBranchId(), flag);
245 if (flag == XAResource.TMSUSPEND) {
246 suspendedXaResources.put(xaRes, manager);
247 }
248 return true;
249 } catch (XAException e) {
250 log.warn("Unable to delist XAResource " + xaRes + ", error code: " + e.errorCode, e);
251 return false;
252 }
253 }
254
255 //Transaction method, does 2pc
256 public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
257 beforePrepare();
258
259 try {
260 boolean timedout = false;
261 if (TransactionTimer.getCurrentTime() > timeout) {
262 status = Status.STATUS_MARKED_ROLLBACK;
263 timedout = true;
264 }
265
266 if (status == Status.STATUS_MARKED_ROLLBACK) {
267 rollbackResources(resourceManagers);
268 if (timedout) {
269 throw new RollbackException("Transaction timout");
270 } else {
271 throw new RollbackException("Unable to commit: transaction marked for rollback");
272 }
273 }
274 synchronized (this) {
275 if (status == Status.STATUS_ACTIVE) {
276 if (this.resourceManagers.size() == 0) {
277 // nothing to commit
278 status = Status.STATUS_COMMITTED;
279 } else if (this.resourceManagers.size() == 1) {
280 // one-phase commit decision
281 status = Status.STATUS_COMMITTING;
282 } else {
283 // start prepare part of two-phase
284 status = Status.STATUS_PREPARING;
285 }
286 }
287 // resourceManagers is now immutable
288 }
289
290 // no-phase
291 if (resourceManagers.size() == 0) {
292 synchronized (this) {
293 status = Status.STATUS_COMMITTED;
294 }
295 return;
296 }
297
298 // one-phase
299 if (resourceManagers.size() == 1) {
300 TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst();
301 try {
302 manager.getCommitter().commit(manager.getBranchId(), true);
303 synchronized (this) {
304 status = Status.STATUS_COMMITTED;
305 }
306 return;
307 } catch (XAException e) {
308 synchronized (this) {
309 status = Status.STATUS_ROLLEDBACK;
310 }
311 throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e);
312 }
313 }
314
315 // two-phase
316 boolean willCommit = internalPrepare();
317
318 // notify the RMs
319 if (willCommit) {
320 commitResources(resourceManagers);
321 } else {
322 rollbackResources(resourceManagers);
323 throw new RollbackException("Unable to commit");
324 }
325 } finally {
326 afterCompletion();
327 synchronized (this) {
328 status = Status.STATUS_NO_TRANSACTION;
329 }
330 }
331 }
332
333 //Used from XATerminator for first phase in a remotely controlled tx.
334 int prepare() throws SystemException, RollbackException {
335 beforePrepare();
336 int result = XAResource.XA_RDONLY;
337 try {
338 LinkedList rms;
339 synchronized (this) {
340 if (status == Status.STATUS_ACTIVE) {
341 if (resourceManagers.size() == 0) {
342 // nothing to commit
343 status = Status.STATUS_COMMITTED;
344 return result;
345 } else {
346 // start prepare part of two-phase
347 status = Status.STATUS_PREPARING;
348 }
349 }
350 // resourceManagers is now immutable
351 rms = resourceManagers;
352 }
353
354 boolean willCommit = internalPrepare();
355
356 // notify the RMs
357 if (willCommit) {
358 if (!rms.isEmpty()) {
359 result = XAResource.XA_OK;
360 }
361 } else {
362 rollbackResources(rms);
363 throw new RollbackException("Unable to commit");
364 }
365 } finally {
366 if (result == XAResource.XA_RDONLY) {
367 afterCompletion();
368 synchronized (this) {
369 status = Status.STATUS_NO_TRANSACTION;
370 }
371 }
372 }
373 return result;
374 }
375
376 //used from XATerminator for commit phase of non-readonly remotely controlled tx.
377 void preparedCommit() throws SystemException {
378 try {
379 commitResources(resourceManagers);
380 } finally {
381 afterCompletion();
382 synchronized (this) {
383 status = Status.STATUS_NO_TRANSACTION;
384 }
385 }
386 }
387
388 //helper method used by Transaction.commit and XATerminator prepare.
389 private void beforePrepare() {
390 synchronized (this) {
391 switch (status) {
392 case Status.STATUS_ACTIVE:
393 case Status.STATUS_MARKED_ROLLBACK:
394 break;
395 default:
396 throw new IllegalStateException("Status is " + getStateString(status));
397 }
398 }
399
400 beforeCompletion();
401 endResources();
402 }
403
404
405 //helper method used by Transaction.commit and XATerminator prepare.
406 private boolean internalPrepare() throws SystemException {
407
408 for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) {
409 synchronized (this) {
410 if (status != Status.STATUS_PREPARING) {
411 // we were marked for rollback
412 break;
413 }
414 }
415 TransactionBranch manager = (TransactionBranch) rms.next();
416 try {
417 int vote = manager.getCommitter().prepare(manager.getBranchId());
418 if (vote == XAResource.XA_RDONLY) {
419 // we don't need to consider this RM any more
420 rms.remove();
421 }
422 } catch (XAException e) {
423 synchronized (this) {
424 status = Status.STATUS_MARKED_ROLLBACK;
425 //TODO document why this is true from the spec.
426 //XAException during prepare means we can assume resource is rolled back.
427 rms.remove();
428 break;
429 }
430 }
431 }
432
433 // decision time...
434 boolean willCommit;
435 synchronized (this) {
436 willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
437 if (willCommit) {
438 status = Status.STATUS_PREPARED;
439 }
440 }
441 // log our decision
442 if (willCommit && !resourceManagers.isEmpty()) {
443 try {
444 logMark = txnLog.prepare(xid, resourceManagers);
445 } catch (LogException e) {
446 try {
447 rollbackResources(resourceManagers);
448 } catch (Exception se) {
449 log.error("Unable to rollback after failure to log prepare", se.getCause());
450 }
451 throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e);
452 }
453 }
454 return willCommit;
455 }
456
457 public void rollback() throws IllegalStateException, SystemException {
458 List rms;
459 synchronized (this) {
460 switch (status) {
461 case Status.STATUS_ACTIVE:
462 status = Status.STATUS_MARKED_ROLLBACK;
463 break;
464 case Status.STATUS_MARKED_ROLLBACK:
465 break;
466 default:
467 throw new IllegalStateException("Status is " + getStateString(status));
468 }
469 rms = resourceManagers;
470 }
471
472 beforeCompletion();
473 endResources();
474 try {
475 rollbackResources(rms);
476 //only write rollback record if we have already written prepare record.
477 if (logMark != null) {
478 try {
479 txnLog.rollback(xid, logMark);
480 } catch (LogException e) {
481 try {
482 rollbackResources(rms);
483 } catch (Exception se) {
484 log.error("Unable to rollback after failure to log decision", se.getCause());
485 }
486 throw (SystemException) new SystemException("Error logging rollback").initCause(e);
487 }
488 }
489 } finally {
490 afterCompletion();
491 synchronized (this) {
492 status = Status.STATUS_NO_TRANSACTION;
493 }
494 }
495 }
496
497 private void beforeCompletion() {
498 int i = 0;
499 while (true) {
500 Synchronization synch;
501 synchronized (this) {
502 if (i == syncList.size()) {
503 if (interposedSynchronization != null) {
504 synch = interposedSynchronization;
505 i++;
506 } else {
507 return;
508 }
509 } else if (i == syncList.size() + 1) {
510 return;
511 } else {
512 synch = (Synchronization) syncList.get(i++);
513 }
514 }
515 try {
516 synch.beforeCompletion();
517 } catch (Exception e) {
518 log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e);
519 synchronized (this) {
520 status = Status.STATUS_MARKED_ROLLBACK;
521 }
522 }
523 }
524 }
525
526 private void afterCompletion() {
527 // this does not synchronize because nothing can modify our state at this time
528 if (interposedSynchronization != null) {
529 try {
530 interposedSynchronization.afterCompletion(status);
531 } catch (Exception e) {
532 log.warn("Unexpected exception from afterCompletion; continuing", e);
533 }
534 }
535 for (Iterator i = syncList.iterator(); i.hasNext();) {
536 Synchronization synch = (Synchronization) i.next();
537 try {
538 synch.afterCompletion(status);
539 } catch (Exception e) {
540 log.warn("Unexpected exception from afterCompletion; continuing", e);
541 continue;
542 }
543 }
544 for (Iterator i = entityManagers.values().iterator(); i.hasNext();) {
545 Closeable entityManager = (Closeable) i.next();
546 entityManager.close();
547 }
548 }
549
550 private void endResources() {
551 endResources(activeXaResources);
552 endResources(suspendedXaResources);
553 }
554
555 private void endResources(IdentityHashMap resourceMap) {
556 while (true) {
557 XAResource xaRes;
558 TransactionBranch manager;
559 int flags;
560 synchronized (this) {
561 Set entrySet = resourceMap.entrySet();
562 if (entrySet.isEmpty()) {
563 return;
564 }
565 Map.Entry entry = (Map.Entry) entrySet.iterator().next();
566 xaRes = (XAResource) entry.getKey();
567 manager = (TransactionBranch) entry.getValue();
568 flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS;
569 resourceMap.remove(xaRes);
570 }
571 try {
572 xaRes.end(manager.getBranchId(), flags);
573 } catch (XAException e) {
574 log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back. XA error code: " + e.errorCode, e);
575 synchronized (this) {
576 status = Status.STATUS_MARKED_ROLLBACK;
577 }
578 }
579 }
580 }
581
582 private void rollbackResources(List rms) throws SystemException {
583 SystemException cause = null;
584 synchronized (this) {
585 status = Status.STATUS_ROLLING_BACK;
586 }
587 for (Iterator i = rms.iterator(); i.hasNext();) {
588 TransactionBranch manager = (TransactionBranch) i.next();
589 try {
590 manager.getCommitter().rollback(manager.getBranchId());
591 } catch (XAException e) {
592 log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e);
593 if (cause == null) {
594 cause = new SystemException(e.errorCode);
595 }
596 continue;
597 }
598 }
599 synchronized (this) {
600 status = Status.STATUS_ROLLEDBACK;
601 }
602 if (cause != null) {
603 throw cause;
604 }
605 }
606
607 private void commitResources(List rms) throws SystemException {
608 SystemException cause = null;
609 synchronized (this) {
610 status = Status.STATUS_COMMITTING;
611 }
612 for (Iterator i = rms.iterator(); i.hasNext();) {
613 TransactionBranch manager = (TransactionBranch) i.next();
614 try {
615 manager.getCommitter().commit(manager.getBranchId(), false);
616 } catch (XAException e) {
617 log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e);
618 if (cause == null) {
619 cause = new SystemException(e.errorCode);
620 }
621 continue;
622 }
623 }
624 //if all resources were read only, we didn't write a prepare record.
625 if (!rms.isEmpty()) {
626 try {
627 txnLog.commit(xid, logMark);
628 } catch (LogException e) {
629 log.error("Unexpected exception logging commit completion for xid " + xid, e);
630 throw (SystemException) new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e);
631 }
632 }
633 synchronized (this) {
634 status = Status.STATUS_COMMITTED;
635 }
636 if (cause != null) {
637 throw cause;
638 }
639 }
640
641 private static String getStateString(int status) {
642 switch (status) {
643 case Status.STATUS_ACTIVE:
644 return "STATUS_ACTIVE";
645 case Status.STATUS_PREPARING:
646 return "STATUS_PREPARING";
647 case Status.STATUS_PREPARED:
648 return "STATUS_PREPARED";
649 case Status.STATUS_MARKED_ROLLBACK:
650 return "STATUS_MARKED_ROLLBACK";
651 case Status.STATUS_ROLLING_BACK:
652 return "STATUS_ROLLING_BACK";
653 case Status.STATUS_COMMITTING:
654 return "STATUS_COMMITTING";
655 case Status.STATUS_COMMITTED:
656 return "STATUS_COMMITTED";
657 case Status.STATUS_ROLLEDBACK:
658 return "STATUS_ROLLEDBACK";
659 case Status.STATUS_NO_TRANSACTION:
660 return "STATUS_NO_TRANSACTION";
661 case Status.STATUS_UNKNOWN:
662 return "STATUS_UNKNOWN";
663 default:
664 throw new AssertionError();
665 }
666 }
667
668 public boolean equals(Object obj) {
669 if (obj instanceof TransactionImpl) {
670 TransactionImpl other = (TransactionImpl) obj;
671 return xid.equals(other.xid);
672 } else {
673 return false;
674 }
675 }
676
677 //when used from recovery, do not add manager to active or suspended resource maps.
678 // The xaresources have already been ended with TMSUCCESS.
679 public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) {
680 TransactionBranch manager = new TransactionBranch(xaRes, branchId);
681 resourceManagers.add(manager);
682 return manager;
683 }
684
685 public Object getEntityManager(String persistenceUnit) {
686 return entityManagers.get(persistenceUnit);
687 }
688
689 public void setEntityManager(String persistenceUnit, Object entityManager) {
690 Object oldEntityManager = entityManagers.put(persistenceUnit, entityManager);
691 if (oldEntityManager != null) {
692 throw new EJBException("EntityManager " + oldEntityManager + " for persistenceUnit " + persistenceUnit + " already associated with this transaction " + xid);
693 }
694 }
695
696 private static class TransactionBranch implements TransactionBranchInfo {
697 private final XAResource committer;
698 private final Xid branchId;
699
700 public TransactionBranch(XAResource xaRes, Xid branchId) {
701 committer = xaRes;
702 this.branchId = branchId;
703 }
704
705 public XAResource getCommitter() {
706 return committer;
707 }
708
709 public Xid getBranchId() {
710 return branchId;
711 }
712
713 public String getResourceName() {
714 if (committer instanceof NamedXAResource) {
715 return ((NamedXAResource) committer).getName();
716 } else {
717 throw new IllegalStateException("Cannot log transactions unles XAResources are named! " + committer);
718 }
719 }
720
721 public Xid getBranchXid() {
722 return branchId;
723 }
724 }
725
726
727 }