1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one or more
4 * contributor license agreements. See the NOTICE file distributed with
5 * this work for additional information regarding copyright ownership.
6 * The ASF licenses this file to You under the Apache License, Version 2.0
7 * (the "License"); you may not use this file except in compliance with
8 * the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.geronimo.transaction.manager;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.IdentityHashMap;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29
30 import javax.transaction.HeuristicMixedException;
31 import javax.transaction.HeuristicRollbackException;
32 import javax.transaction.RollbackException;
33 import javax.transaction.Status;
34 import javax.transaction.Synchronization;
35 import javax.transaction.SystemException;
36 import javax.transaction.Transaction;
37 import javax.transaction.xa.XAException;
38 import javax.transaction.xa.XAResource;
39 import javax.transaction.xa.Xid;
40 import javax.ejb.EJBException;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44
45 /**
46 * Basic local transaction with support for multiple resources.
47 *
48 * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
49 */
50 public class TransactionImpl implements Transaction {
51 private static final Log log = LogFactory.getLog("Transaction");
52
53 private final XidFactory xidFactory;
54 private final Xid xid;
55 private final TransactionLog txnLog;
56 private final long timeout;
57 private final List syncList = new ArrayList(5);
58 private final LinkedList resourceManagers = new LinkedList();
59 private final IdentityHashMap activeXaResources = new IdentityHashMap(3);
60 private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3);
61 private int status = Status.STATUS_NO_TRANSACTION;
62 private Object logMark;
63
64 private final Map resources = new HashMap();
65 private Synchronization interposedSynchronization;
66 private final Map entityManagers = new HashMap();
67
68 TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
69 this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds);
70 }
71
72 TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
73 this.xidFactory = xidFactory;
74 this.txnLog = txnLog;
75 this.xid = xid;
76 this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime();
77 try {
78 txnLog.begin(xid);
79 } catch (LogException e) {
80 status = Status.STATUS_MARKED_ROLLBACK;
81 SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)");
82 ex.initCause(e);
83 throw ex;
84 }
85 status = Status.STATUS_ACTIVE;
86 }
87
88
89 public TransactionImpl(Xid xid, TransactionLog txLog) {
90 this.xidFactory = null;
91 this.txnLog = txLog;
92 this.xid = xid;
93 status = Status.STATUS_PREPARED;
94
95 this.timeout = Long.MAX_VALUE;
96 }
97
98 public synchronized int getStatus() {
99 return status;
100 }
101
102 public Object getResource(Object key) {
103 return resources.get(key);
104 }
105
106 public boolean getRollbackOnly() {
107 return status == Status.STATUS_MARKED_ROLLBACK;
108 }
109
110 public Object getTransactionKey() {
111 return xid;
112 }
113
114 public int getTransactionStatus() {
115 return status;
116 }
117
118 public void putResource(Object key, Object value) {
119 if (key == null) {
120 throw new NullPointerException("You must supply a non-null key for putResource");
121 }
122 resources.put(key, value);
123 }
124
125 public void registerInterposedSynchronization(Synchronization synchronization) {
126 interposedSynchronization = synchronization;
127 }
128
129 public synchronized void setRollbackOnly() throws IllegalStateException {
130 switch (status) {
131 case Status.STATUS_ACTIVE:
132 case Status.STATUS_PREPARING:
133 status = Status.STATUS_MARKED_ROLLBACK;
134 break;
135 case Status.STATUS_MARKED_ROLLBACK:
136 case Status.STATUS_ROLLING_BACK:
137
138 break;
139 default:
140 throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status));
141 }
142 }
143
144 public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
145 if (synch == null) {
146 throw new IllegalArgumentException("Synchronization is null");
147 }
148 switch (status) {
149 case Status.STATUS_ACTIVE:
150 case Status.STATUS_PREPARING:
151 break;
152 case Status.STATUS_MARKED_ROLLBACK:
153 throw new RollbackException("Transaction is marked for rollback");
154 default:
155 throw new IllegalStateException("Status is " + getStateString(status));
156 }
157 syncList.add(synch);
158 }
159
160 public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
161 if (xaRes == null) {
162 throw new IllegalArgumentException("XAResource is null");
163 }
164 switch (status) {
165 case Status.STATUS_ACTIVE:
166 break;
167 case Status.STATUS_MARKED_ROLLBACK:
168 throw new RollbackException("Transaction is marked for rollback");
169 default:
170 throw new IllegalStateException("Status is " + getStateString(status));
171 }
172
173 if (activeXaResources.containsKey(xaRes)) {
174 throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!");
175 }
176
177 try {
178 TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
179 if (manager != null) {
180
181 xaRes.start(manager.getBranchId(), XAResource.TMRESUME);
182 activeXaResources.put(xaRes, manager);
183 return true;
184 }
185
186 for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
187 manager = (TransactionBranch) i.next();
188 boolean sameRM;
189
190 if (xaRes == manager.getCommitter()) {
191 throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended");
192 }
193
194 try {
195 sameRM = xaRes.isSameRM(manager.getCommitter());
196 } catch (XAException e) {
197 log.warn("Unexpected error checking for same RM", e);
198 continue;
199 }
200 if (sameRM) {
201 xaRes.start(manager.getBranchId(), XAResource.TMJOIN);
202 activeXaResources.put(xaRes, manager);
203 return true;
204 }
205 }
206
207 Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1);
208 xaRes.start(branchId, XAResource.TMNOFLAGS);
209 activeXaResources.put(xaRes, addBranchXid(xaRes, branchId));
210 return true;
211 } catch (XAException e) {
212 log.warn("Unable to enlist XAResource " + xaRes + ", errorCode: " + e.errorCode, e);
213 return false;
214 }
215 }
216
217 public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
218 if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) {
219 throw new IllegalStateException("invalid flag for delistResource: " + flag);
220 }
221 if (xaRes == null) {
222 throw new IllegalArgumentException("XAResource is null");
223 }
224 switch (status) {
225 case Status.STATUS_ACTIVE:
226 case Status.STATUS_MARKED_ROLLBACK:
227 break;
228 default:
229 throw new IllegalStateException("Status is " + getStateString(status));
230 }
231 TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes);
232 if (manager == null) {
233 if (flag == XAResource.TMSUSPEND) {
234 throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes);
235 }
236
237 manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
238 if (manager == null) {
239 throw new IllegalStateException("Resource not known to transaction: " + xaRes);
240 }
241 }
242
243 try {
244 xaRes.end(manager.getBranchId(), flag);
245 if (flag == XAResource.TMSUSPEND) {
246 suspendedXaResources.put(xaRes, manager);
247 }
248 return true;
249 } catch (XAException e) {
250 log.warn("Unable to delist XAResource " + xaRes + ", error code: " + e.errorCode, e);
251 return false;
252 }
253 }
254
255
256 public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
257 beforePrepare();
258
259 try {
260 boolean timedout = false;
261 if (TransactionTimer.getCurrentTime() > timeout) {
262 status = Status.STATUS_MARKED_ROLLBACK;
263 timedout = true;
264 }
265
266 if (status == Status.STATUS_MARKED_ROLLBACK) {
267 rollbackResources(resourceManagers);
268 if (timedout) {
269 throw new RollbackException("Transaction timout");
270 } else {
271 throw new RollbackException("Unable to commit: transaction marked for rollback");
272 }
273 }
274 synchronized (this) {
275 if (status == Status.STATUS_ACTIVE) {
276 if (this.resourceManagers.size() == 0) {
277
278 status = Status.STATUS_COMMITTED;
279 } else if (this.resourceManagers.size() == 1) {
280
281 status = Status.STATUS_COMMITTING;
282 } else {
283
284 status = Status.STATUS_PREPARING;
285 }
286 }
287
288 }
289
290
291 if (resourceManagers.size() == 0) {
292 synchronized (this) {
293 status = Status.STATUS_COMMITTED;
294 }
295 return;
296 }
297
298
299 if (resourceManagers.size() == 1) {
300 TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst();
301 try {
302 manager.getCommitter().commit(manager.getBranchId(), true);
303 synchronized (this) {
304 status = Status.STATUS_COMMITTED;
305 }
306 return;
307 } catch (XAException e) {
308 synchronized (this) {
309 status = Status.STATUS_ROLLEDBACK;
310 }
311 throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e);
312 }
313 }
314
315
316 boolean willCommit = internalPrepare();
317
318
319 if (willCommit) {
320 commitResources(resourceManagers);
321 } else {
322 rollbackResources(resourceManagers);
323 throw new RollbackException("Unable to commit");
324 }
325 } finally {
326 afterCompletion();
327 synchronized (this) {
328 status = Status.STATUS_NO_TRANSACTION;
329 }
330 }
331 }
332
333
334 int prepare() throws SystemException, RollbackException {
335 beforePrepare();
336 int result = XAResource.XA_RDONLY;
337 try {
338 LinkedList rms;
339 synchronized (this) {
340 if (status == Status.STATUS_ACTIVE) {
341 if (resourceManagers.size() == 0) {
342
343 status = Status.STATUS_COMMITTED;
344 return result;
345 } else {
346
347 status = Status.STATUS_PREPARING;
348 }
349 }
350
351 rms = resourceManagers;
352 }
353
354 boolean willCommit = internalPrepare();
355
356
357 if (willCommit) {
358 if (!rms.isEmpty()) {
359 result = XAResource.XA_OK;
360 }
361 } else {
362 rollbackResources(rms);
363 throw new RollbackException("Unable to commit");
364 }
365 } finally {
366 if (result == XAResource.XA_RDONLY) {
367 afterCompletion();
368 synchronized (this) {
369 status = Status.STATUS_NO_TRANSACTION;
370 }
371 }
372 }
373 return result;
374 }
375
376
377 void preparedCommit() throws SystemException {
378 try {
379 commitResources(resourceManagers);
380 } finally {
381 afterCompletion();
382 synchronized (this) {
383 status = Status.STATUS_NO_TRANSACTION;
384 }
385 }
386 }
387
388
389 private void beforePrepare() {
390 synchronized (this) {
391 switch (status) {
392 case Status.STATUS_ACTIVE:
393 case Status.STATUS_MARKED_ROLLBACK:
394 break;
395 default:
396 throw new IllegalStateException("Status is " + getStateString(status));
397 }
398 }
399
400 beforeCompletion();
401 endResources();
402 }
403
404
405
406 private boolean internalPrepare() throws SystemException {
407
408 for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) {
409 synchronized (this) {
410 if (status != Status.STATUS_PREPARING) {
411
412 break;
413 }
414 }
415 TransactionBranch manager = (TransactionBranch) rms.next();
416 try {
417 int vote = manager.getCommitter().prepare(manager.getBranchId());
418 if (vote == XAResource.XA_RDONLY) {
419
420 rms.remove();
421 }
422 } catch (XAException e) {
423 synchronized (this) {
424 status = Status.STATUS_MARKED_ROLLBACK;
425
426
427 rms.remove();
428 break;
429 }
430 }
431 }
432
433
434 boolean willCommit;
435 synchronized (this) {
436 willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
437 if (willCommit) {
438 status = Status.STATUS_PREPARED;
439 }
440 }
441
442 if (willCommit && !resourceManagers.isEmpty()) {
443 try {
444 logMark = txnLog.prepare(xid, resourceManagers);
445 } catch (LogException e) {
446 try {
447 rollbackResources(resourceManagers);
448 } catch (Exception se) {
449 log.error("Unable to rollback after failure to log prepare", se.getCause());
450 }
451 throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e);
452 }
453 }
454 return willCommit;
455 }
456
457 public void rollback() throws IllegalStateException, SystemException {
458 List rms;
459 synchronized (this) {
460 switch (status) {
461 case Status.STATUS_ACTIVE:
462 status = Status.STATUS_MARKED_ROLLBACK;
463 break;
464 case Status.STATUS_MARKED_ROLLBACK:
465 break;
466 default:
467 throw new IllegalStateException("Status is " + getStateString(status));
468 }
469 rms = resourceManagers;
470 }
471
472 beforeCompletion();
473 endResources();
474 try {
475 rollbackResources(rms);
476
477 if (logMark != null) {
478 try {
479 txnLog.rollback(xid, logMark);
480 } catch (LogException e) {
481 try {
482 rollbackResources(rms);
483 } catch (Exception se) {
484 log.error("Unable to rollback after failure to log decision", se.getCause());
485 }
486 throw (SystemException) new SystemException("Error logging rollback").initCause(e);
487 }
488 }
489 } finally {
490 afterCompletion();
491 synchronized (this) {
492 status = Status.STATUS_NO_TRANSACTION;
493 }
494 }
495 }
496
497 private void beforeCompletion() {
498 int i = 0;
499 while (true) {
500 Synchronization synch;
501 synchronized (this) {
502 if (i == syncList.size()) {
503 if (interposedSynchronization != null) {
504 synch = interposedSynchronization;
505 i++;
506 } else {
507 return;
508 }
509 } else if (i == syncList.size() + 1) {
510 return;
511 } else {
512 synch = (Synchronization) syncList.get(i++);
513 }
514 }
515 try {
516 synch.beforeCompletion();
517 } catch (Exception e) {
518 log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e);
519 synchronized (this) {
520 status = Status.STATUS_MARKED_ROLLBACK;
521 }
522 }
523 }
524 }
525
526 private void afterCompletion() {
527
528 if (interposedSynchronization != null) {
529 try {
530 interposedSynchronization.afterCompletion(status);
531 } catch (Exception e) {
532 log.warn("Unexpected exception from afterCompletion; continuing", e);
533 }
534 }
535 for (Iterator i = syncList.iterator(); i.hasNext();) {
536 Synchronization synch = (Synchronization) i.next();
537 try {
538 synch.afterCompletion(status);
539 } catch (Exception e) {
540 log.warn("Unexpected exception from afterCompletion; continuing", e);
541 continue;
542 }
543 }
544 for (Iterator i = entityManagers.values().iterator(); i.hasNext();) {
545 Closeable entityManager = (Closeable) i.next();
546 entityManager.close();
547 }
548 }
549
550 private void endResources() {
551 endResources(activeXaResources);
552 endResources(suspendedXaResources);
553 }
554
555 private void endResources(IdentityHashMap resourceMap) {
556 while (true) {
557 XAResource xaRes;
558 TransactionBranch manager;
559 int flags;
560 synchronized (this) {
561 Set entrySet = resourceMap.entrySet();
562 if (entrySet.isEmpty()) {
563 return;
564 }
565 Map.Entry entry = (Map.Entry) entrySet.iterator().next();
566 xaRes = (XAResource) entry.getKey();
567 manager = (TransactionBranch) entry.getValue();
568 flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS;
569 resourceMap.remove(xaRes);
570 }
571 try {
572 xaRes.end(manager.getBranchId(), flags);
573 } catch (XAException e) {
574 log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back. XA error code: " + e.errorCode, e);
575 synchronized (this) {
576 status = Status.STATUS_MARKED_ROLLBACK;
577 }
578 }
579 }
580 }
581
582 private void rollbackResources(List rms) throws SystemException {
583 SystemException cause = null;
584 synchronized (this) {
585 status = Status.STATUS_ROLLING_BACK;
586 }
587 for (Iterator i = rms.iterator(); i.hasNext();) {
588 TransactionBranch manager = (TransactionBranch) i.next();
589 try {
590 manager.getCommitter().rollback(manager.getBranchId());
591 } catch (XAException e) {
592 log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e);
593 if (cause == null) {
594 cause = new SystemException(e.errorCode);
595 }
596 continue;
597 }
598 }
599 synchronized (this) {
600 status = Status.STATUS_ROLLEDBACK;
601 }
602 if (cause != null) {
603 throw cause;
604 }
605 }
606
607 private void commitResources(List rms) throws SystemException {
608 SystemException cause = null;
609 synchronized (this) {
610 status = Status.STATUS_COMMITTING;
611 }
612 for (Iterator i = rms.iterator(); i.hasNext();) {
613 TransactionBranch manager = (TransactionBranch) i.next();
614 try {
615 manager.getCommitter().commit(manager.getBranchId(), false);
616 } catch (XAException e) {
617 log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e);
618 if (cause == null) {
619 cause = new SystemException(e.errorCode);
620 }
621 continue;
622 }
623 }
624
625 if (!rms.isEmpty()) {
626 try {
627 txnLog.commit(xid, logMark);
628 } catch (LogException e) {
629 log.error("Unexpected exception logging commit completion for xid " + xid, e);
630 throw (SystemException) new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e);
631 }
632 }
633 synchronized (this) {
634 status = Status.STATUS_COMMITTED;
635 }
636 if (cause != null) {
637 throw cause;
638 }
639 }
640
641 private static String getStateString(int status) {
642 switch (status) {
643 case Status.STATUS_ACTIVE:
644 return "STATUS_ACTIVE";
645 case Status.STATUS_PREPARING:
646 return "STATUS_PREPARING";
647 case Status.STATUS_PREPARED:
648 return "STATUS_PREPARED";
649 case Status.STATUS_MARKED_ROLLBACK:
650 return "STATUS_MARKED_ROLLBACK";
651 case Status.STATUS_ROLLING_BACK:
652 return "STATUS_ROLLING_BACK";
653 case Status.STATUS_COMMITTING:
654 return "STATUS_COMMITTING";
655 case Status.STATUS_COMMITTED:
656 return "STATUS_COMMITTED";
657 case Status.STATUS_ROLLEDBACK:
658 return "STATUS_ROLLEDBACK";
659 case Status.STATUS_NO_TRANSACTION:
660 return "STATUS_NO_TRANSACTION";
661 case Status.STATUS_UNKNOWN:
662 return "STATUS_UNKNOWN";
663 default:
664 throw new AssertionError();
665 }
666 }
667
668 public boolean equals(Object obj) {
669 if (obj instanceof TransactionImpl) {
670 TransactionImpl other = (TransactionImpl) obj;
671 return xid.equals(other.xid);
672 } else {
673 return false;
674 }
675 }
676
677
678
679 public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) {
680 TransactionBranch manager = new TransactionBranch(xaRes, branchId);
681 resourceManagers.add(manager);
682 return manager;
683 }
684
685 public Object getEntityManager(String persistenceUnit) {
686 return entityManagers.get(persistenceUnit);
687 }
688
689 public void setEntityManager(String persistenceUnit, Object entityManager) {
690 Object oldEntityManager = entityManagers.put(persistenceUnit, entityManager);
691 if (oldEntityManager != null) {
692 throw new EJBException("EntityManager " + oldEntityManager + " for persistenceUnit " + persistenceUnit + " already associated with this transaction " + xid);
693 }
694 }
695
696 private static class TransactionBranch implements TransactionBranchInfo {
697 private final XAResource committer;
698 private final Xid branchId;
699
700 public TransactionBranch(XAResource xaRes, Xid branchId) {
701 committer = xaRes;
702 this.branchId = branchId;
703 }
704
705 public XAResource getCommitter() {
706 return committer;
707 }
708
709 public Xid getBranchId() {
710 return branchId;
711 }
712
713 public String getResourceName() {
714 if (committer instanceof NamedXAResource) {
715 return ((NamedXAResource) committer).getName();
716 } else {
717 throw new IllegalStateException("Cannot log transactions unles XAResources are named! " + committer);
718 }
719 }
720
721 public Xid getBranchXid() {
722 return branchId;
723 }
724 }
725
726
727 }