001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.geronimo.transaction.manager; 020 021 import java.util.ArrayList; 022 import java.util.HashMap; 023 import java.util.IdentityHashMap; 024 import java.util.Iterator; 025 import java.util.LinkedList; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Set; 029 030 import javax.transaction.HeuristicMixedException; 031 import javax.transaction.HeuristicRollbackException; 032 import javax.transaction.RollbackException; 033 import javax.transaction.Status; 034 import javax.transaction.Synchronization; 035 import javax.transaction.SystemException; 036 import javax.transaction.Transaction; 037 import javax.transaction.xa.XAException; 038 import javax.transaction.xa.XAResource; 039 import javax.transaction.xa.Xid; 040 import javax.ejb.EJBException; 041 042 import org.apache.commons.logging.Log; 043 import org.apache.commons.logging.LogFactory; 044 045 /** 046 * Basic local transaction with support for multiple resources. 047 * 048 * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $ 049 */ 050 public class TransactionImpl implements Transaction { 051 private static final Log log = LogFactory.getLog("Transaction"); 052 053 private final XidFactory xidFactory; 054 private final Xid xid; 055 private final TransactionLog txnLog; 056 private final long timeout; 057 private final List syncList = new ArrayList(5); 058 private final LinkedList resourceManagers = new LinkedList(); 059 private final IdentityHashMap activeXaResources = new IdentityHashMap(3); 060 private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3); 061 private int status = Status.STATUS_NO_TRANSACTION; 062 private Object logMark; 063 064 private final Map resources = new HashMap(); 065 private Synchronization interposedSynchronization; 066 private final Map entityManagers = new HashMap(); 067 068 TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException { 069 this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds); 070 } 071 072 TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException { 073 this.xidFactory = xidFactory; 074 this.txnLog = txnLog; 075 this.xid = xid; 076 this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime(); 077 try { 078 txnLog.begin(xid); 079 } catch (LogException e) { 080 status = Status.STATUS_MARKED_ROLLBACK; 081 SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)"); 082 ex.initCause(e); 083 throw ex; 084 } 085 status = Status.STATUS_ACTIVE; 086 } 087 088 //reconstruct a tx for an external tx found in recovery 089 public TransactionImpl(Xid xid, TransactionLog txLog) { 090 this.xidFactory = null; 091 this.txnLog = txLog; 092 this.xid = xid; 093 status = Status.STATUS_PREPARED; 094 //TODO is this a good idea? 095 this.timeout = Long.MAX_VALUE; 096 } 097 098 public synchronized int getStatus() { 099 return status; 100 } 101 102 public Object getResource(Object key) { 103 return resources.get(key); 104 } 105 106 public boolean getRollbackOnly() { 107 return status == Status.STATUS_MARKED_ROLLBACK; 108 } 109 110 public Object getTransactionKey() { 111 return xid; 112 } 113 114 public int getTransactionStatus() { 115 return status; 116 } 117 118 public void putResource(Object key, Object value) { 119 if (key == null) { 120 throw new NullPointerException("You must supply a non-null key for putResource"); 121 } 122 resources.put(key, value); 123 } 124 125 public void registerInterposedSynchronization(Synchronization synchronization) { 126 interposedSynchronization = synchronization; 127 } 128 129 public synchronized void setRollbackOnly() throws IllegalStateException { 130 switch (status) { 131 case Status.STATUS_ACTIVE: 132 case Status.STATUS_PREPARING: 133 status = Status.STATUS_MARKED_ROLLBACK; 134 break; 135 case Status.STATUS_MARKED_ROLLBACK: 136 case Status.STATUS_ROLLING_BACK: 137 // nothing to do 138 break; 139 default: 140 throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status)); 141 } 142 } 143 144 public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException { 145 if (synch == null) { 146 throw new IllegalArgumentException("Synchronization is null"); 147 } 148 switch (status) { 149 case Status.STATUS_ACTIVE: 150 case Status.STATUS_PREPARING: 151 break; 152 case Status.STATUS_MARKED_ROLLBACK: 153 throw new RollbackException("Transaction is marked for rollback"); 154 default: 155 throw new IllegalStateException("Status is " + getStateString(status)); 156 } 157 syncList.add(synch); 158 } 159 160 public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException { 161 if (xaRes == null) { 162 throw new IllegalArgumentException("XAResource is null"); 163 } 164 switch (status) { 165 case Status.STATUS_ACTIVE: 166 break; 167 case Status.STATUS_MARKED_ROLLBACK: 168 throw new RollbackException("Transaction is marked for rollback"); 169 default: 170 throw new IllegalStateException("Status is " + getStateString(status)); 171 } 172 173 if (activeXaResources.containsKey(xaRes)) { 174 throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!"); 175 } 176 177 try { 178 TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes); 179 if (manager != null) { 180 //we know about this one, it was suspended 181 xaRes.start(manager.getBranchId(), XAResource.TMRESUME); 182 activeXaResources.put(xaRes, manager); 183 return true; 184 } 185 //it is not suspended. 186 for (Iterator i = resourceManagers.iterator(); i.hasNext();) { 187 manager = (TransactionBranch) i.next(); 188 boolean sameRM; 189 //if the xares is already known, we must be resuming after a suspend. 190 if (xaRes == manager.getCommitter()) { 191 throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended"); 192 } 193 //Otherwise, see if this is a new xares for the same resource manager 194 try { 195 sameRM = xaRes.isSameRM(manager.getCommitter()); 196 } catch (XAException e) { 197 log.warn("Unexpected error checking for same RM", e); 198 continue; 199 } 200 if (sameRM) { 201 xaRes.start(manager.getBranchId(), XAResource.TMJOIN); 202 activeXaResources.put(xaRes, manager); 203 return true; 204 } 205 } 206 //we know nothing about this XAResource or resource manager 207 Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1); 208 xaRes.start(branchId, XAResource.TMNOFLAGS); 209 activeXaResources.put(xaRes, addBranchXid(xaRes, branchId)); 210 return true; 211 } catch (XAException e) { 212 log.warn("Unable to enlist XAResource " + xaRes + ", errorCode: " + e.errorCode, e); 213 return false; 214 } 215 } 216 217 public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException { 218 if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) { 219 throw new IllegalStateException("invalid flag for delistResource: " + flag); 220 } 221 if (xaRes == null) { 222 throw new IllegalArgumentException("XAResource is null"); 223 } 224 switch (status) { 225 case Status.STATUS_ACTIVE: 226 case Status.STATUS_MARKED_ROLLBACK: 227 break; 228 default: 229 throw new IllegalStateException("Status is " + getStateString(status)); 230 } 231 TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes); 232 if (manager == null) { 233 if (flag == XAResource.TMSUSPEND) { 234 throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes); 235 } 236 //not active, and we are not trying to suspend. We must be ending tx. 237 manager = (TransactionBranch) suspendedXaResources.remove(xaRes); 238 if (manager == null) { 239 throw new IllegalStateException("Resource not known to transaction: " + xaRes); 240 } 241 } 242 243 try { 244 xaRes.end(manager.getBranchId(), flag); 245 if (flag == XAResource.TMSUSPEND) { 246 suspendedXaResources.put(xaRes, manager); 247 } 248 return true; 249 } catch (XAException e) { 250 log.warn("Unable to delist XAResource " + xaRes + ", error code: " + e.errorCode, e); 251 return false; 252 } 253 } 254 255 //Transaction method, does 2pc 256 public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException { 257 beforePrepare(); 258 259 try { 260 boolean timedout = false; 261 if (TransactionTimer.getCurrentTime() > timeout) { 262 status = Status.STATUS_MARKED_ROLLBACK; 263 timedout = true; 264 } 265 266 if (status == Status.STATUS_MARKED_ROLLBACK) { 267 rollbackResources(resourceManagers); 268 if (timedout) { 269 throw new RollbackException("Transaction timout"); 270 } else { 271 throw new RollbackException("Unable to commit: transaction marked for rollback"); 272 } 273 } 274 synchronized (this) { 275 if (status == Status.STATUS_ACTIVE) { 276 if (this.resourceManagers.size() == 0) { 277 // nothing to commit 278 status = Status.STATUS_COMMITTED; 279 } else if (this.resourceManagers.size() == 1) { 280 // one-phase commit decision 281 status = Status.STATUS_COMMITTING; 282 } else { 283 // start prepare part of two-phase 284 status = Status.STATUS_PREPARING; 285 } 286 } 287 // resourceManagers is now immutable 288 } 289 290 // no-phase 291 if (resourceManagers.size() == 0) { 292 synchronized (this) { 293 status = Status.STATUS_COMMITTED; 294 } 295 return; 296 } 297 298 // one-phase 299 if (resourceManagers.size() == 1) { 300 TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst(); 301 try { 302 manager.getCommitter().commit(manager.getBranchId(), true); 303 synchronized (this) { 304 status = Status.STATUS_COMMITTED; 305 } 306 return; 307 } catch (XAException e) { 308 synchronized (this) { 309 status = Status.STATUS_ROLLEDBACK; 310 } 311 throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e); 312 } 313 } 314 315 // two-phase 316 boolean willCommit = internalPrepare(); 317 318 // notify the RMs 319 if (willCommit) { 320 commitResources(resourceManagers); 321 } else { 322 rollbackResources(resourceManagers); 323 throw new RollbackException("Unable to commit"); 324 } 325 } finally { 326 afterCompletion(); 327 synchronized (this) { 328 status = Status.STATUS_NO_TRANSACTION; 329 } 330 } 331 } 332 333 //Used from XATerminator for first phase in a remotely controlled tx. 334 int prepare() throws SystemException, RollbackException { 335 beforePrepare(); 336 int result = XAResource.XA_RDONLY; 337 try { 338 LinkedList rms; 339 synchronized (this) { 340 if (status == Status.STATUS_ACTIVE) { 341 if (resourceManagers.size() == 0) { 342 // nothing to commit 343 status = Status.STATUS_COMMITTED; 344 return result; 345 } else { 346 // start prepare part of two-phase 347 status = Status.STATUS_PREPARING; 348 } 349 } 350 // resourceManagers is now immutable 351 rms = resourceManagers; 352 } 353 354 boolean willCommit = internalPrepare(); 355 356 // notify the RMs 357 if (willCommit) { 358 if (!rms.isEmpty()) { 359 result = XAResource.XA_OK; 360 } 361 } else { 362 rollbackResources(rms); 363 throw new RollbackException("Unable to commit"); 364 } 365 } finally { 366 if (result == XAResource.XA_RDONLY) { 367 afterCompletion(); 368 synchronized (this) { 369 status = Status.STATUS_NO_TRANSACTION; 370 } 371 } 372 } 373 return result; 374 } 375 376 //used from XATerminator for commit phase of non-readonly remotely controlled tx. 377 void preparedCommit() throws SystemException { 378 try { 379 commitResources(resourceManagers); 380 } finally { 381 afterCompletion(); 382 synchronized (this) { 383 status = Status.STATUS_NO_TRANSACTION; 384 } 385 } 386 } 387 388 //helper method used by Transaction.commit and XATerminator prepare. 389 private void beforePrepare() { 390 synchronized (this) { 391 switch (status) { 392 case Status.STATUS_ACTIVE: 393 case Status.STATUS_MARKED_ROLLBACK: 394 break; 395 default: 396 throw new IllegalStateException("Status is " + getStateString(status)); 397 } 398 } 399 400 beforeCompletion(); 401 endResources(); 402 } 403 404 405 //helper method used by Transaction.commit and XATerminator prepare. 406 private boolean internalPrepare() throws SystemException { 407 408 for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) { 409 synchronized (this) { 410 if (status != Status.STATUS_PREPARING) { 411 // we were marked for rollback 412 break; 413 } 414 } 415 TransactionBranch manager = (TransactionBranch) rms.next(); 416 try { 417 int vote = manager.getCommitter().prepare(manager.getBranchId()); 418 if (vote == XAResource.XA_RDONLY) { 419 // we don't need to consider this RM any more 420 rms.remove(); 421 } 422 } catch (XAException e) { 423 synchronized (this) { 424 status = Status.STATUS_MARKED_ROLLBACK; 425 //TODO document why this is true from the spec. 426 //XAException during prepare means we can assume resource is rolled back. 427 rms.remove(); 428 break; 429 } 430 } 431 } 432 433 // decision time... 434 boolean willCommit; 435 synchronized (this) { 436 willCommit = (status != Status.STATUS_MARKED_ROLLBACK); 437 if (willCommit) { 438 status = Status.STATUS_PREPARED; 439 } 440 } 441 // log our decision 442 if (willCommit && !resourceManagers.isEmpty()) { 443 try { 444 logMark = txnLog.prepare(xid, resourceManagers); 445 } catch (LogException e) { 446 try { 447 rollbackResources(resourceManagers); 448 } catch (Exception se) { 449 log.error("Unable to rollback after failure to log prepare", se.getCause()); 450 } 451 throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e); 452 } 453 } 454 return willCommit; 455 } 456 457 public void rollback() throws IllegalStateException, SystemException { 458 List rms; 459 synchronized (this) { 460 switch (status) { 461 case Status.STATUS_ACTIVE: 462 status = Status.STATUS_MARKED_ROLLBACK; 463 break; 464 case Status.STATUS_MARKED_ROLLBACK: 465 break; 466 default: 467 throw new IllegalStateException("Status is " + getStateString(status)); 468 } 469 rms = resourceManagers; 470 } 471 472 beforeCompletion(); 473 endResources(); 474 try { 475 rollbackResources(rms); 476 //only write rollback record if we have already written prepare record. 477 if (logMark != null) { 478 try { 479 txnLog.rollback(xid, logMark); 480 } catch (LogException e) { 481 try { 482 rollbackResources(rms); 483 } catch (Exception se) { 484 log.error("Unable to rollback after failure to log decision", se.getCause()); 485 } 486 throw (SystemException) new SystemException("Error logging rollback").initCause(e); 487 } 488 } 489 } finally { 490 afterCompletion(); 491 synchronized (this) { 492 status = Status.STATUS_NO_TRANSACTION; 493 } 494 } 495 } 496 497 private void beforeCompletion() { 498 int i = 0; 499 while (true) { 500 Synchronization synch; 501 synchronized (this) { 502 if (i == syncList.size()) { 503 if (interposedSynchronization != null) { 504 synch = interposedSynchronization; 505 i++; 506 } else { 507 return; 508 } 509 } else if (i == syncList.size() + 1) { 510 return; 511 } else { 512 synch = (Synchronization) syncList.get(i++); 513 } 514 } 515 try { 516 synch.beforeCompletion(); 517 } catch (Exception e) { 518 log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e); 519 synchronized (this) { 520 status = Status.STATUS_MARKED_ROLLBACK; 521 } 522 } 523 } 524 } 525 526 private void afterCompletion() { 527 // this does not synchronize because nothing can modify our state at this time 528 if (interposedSynchronization != null) { 529 try { 530 interposedSynchronization.afterCompletion(status); 531 } catch (Exception e) { 532 log.warn("Unexpected exception from afterCompletion; continuing", e); 533 } 534 } 535 for (Iterator i = syncList.iterator(); i.hasNext();) { 536 Synchronization synch = (Synchronization) i.next(); 537 try { 538 synch.afterCompletion(status); 539 } catch (Exception e) { 540 log.warn("Unexpected exception from afterCompletion; continuing", e); 541 continue; 542 } 543 } 544 for (Iterator i = entityManagers.values().iterator(); i.hasNext();) { 545 Closeable entityManager = (Closeable) i.next(); 546 entityManager.close(); 547 } 548 } 549 550 private void endResources() { 551 endResources(activeXaResources); 552 endResources(suspendedXaResources); 553 } 554 555 private void endResources(IdentityHashMap resourceMap) { 556 while (true) { 557 XAResource xaRes; 558 TransactionBranch manager; 559 int flags; 560 synchronized (this) { 561 Set entrySet = resourceMap.entrySet(); 562 if (entrySet.isEmpty()) { 563 return; 564 } 565 Map.Entry entry = (Map.Entry) entrySet.iterator().next(); 566 xaRes = (XAResource) entry.getKey(); 567 manager = (TransactionBranch) entry.getValue(); 568 flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS; 569 resourceMap.remove(xaRes); 570 } 571 try { 572 xaRes.end(manager.getBranchId(), flags); 573 } catch (XAException e) { 574 log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back. XA error code: " + e.errorCode, e); 575 synchronized (this) { 576 status = Status.STATUS_MARKED_ROLLBACK; 577 } 578 } 579 } 580 } 581 582 private void rollbackResources(List rms) throws SystemException { 583 SystemException cause = null; 584 synchronized (this) { 585 status = Status.STATUS_ROLLING_BACK; 586 } 587 for (Iterator i = rms.iterator(); i.hasNext();) { 588 TransactionBranch manager = (TransactionBranch) i.next(); 589 try { 590 manager.getCommitter().rollback(manager.getBranchId()); 591 } catch (XAException e) { 592 log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e); 593 if (cause == null) { 594 cause = new SystemException(e.errorCode); 595 } 596 continue; 597 } 598 } 599 synchronized (this) { 600 status = Status.STATUS_ROLLEDBACK; 601 } 602 if (cause != null) { 603 throw cause; 604 } 605 } 606 607 private void commitResources(List rms) throws SystemException { 608 SystemException cause = null; 609 synchronized (this) { 610 status = Status.STATUS_COMMITTING; 611 } 612 for (Iterator i = rms.iterator(); i.hasNext();) { 613 TransactionBranch manager = (TransactionBranch) i.next(); 614 try { 615 manager.getCommitter().commit(manager.getBranchId(), false); 616 } catch (XAException e) { 617 log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e); 618 if (cause == null) { 619 cause = new SystemException(e.errorCode); 620 } 621 continue; 622 } 623 } 624 //if all resources were read only, we didn't write a prepare record. 625 if (!rms.isEmpty()) { 626 try { 627 txnLog.commit(xid, logMark); 628 } catch (LogException e) { 629 log.error("Unexpected exception logging commit completion for xid " + xid, e); 630 throw (SystemException) new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e); 631 } 632 } 633 synchronized (this) { 634 status = Status.STATUS_COMMITTED; 635 } 636 if (cause != null) { 637 throw cause; 638 } 639 } 640 641 private static String getStateString(int status) { 642 switch (status) { 643 case Status.STATUS_ACTIVE: 644 return "STATUS_ACTIVE"; 645 case Status.STATUS_PREPARING: 646 return "STATUS_PREPARING"; 647 case Status.STATUS_PREPARED: 648 return "STATUS_PREPARED"; 649 case Status.STATUS_MARKED_ROLLBACK: 650 return "STATUS_MARKED_ROLLBACK"; 651 case Status.STATUS_ROLLING_BACK: 652 return "STATUS_ROLLING_BACK"; 653 case Status.STATUS_COMMITTING: 654 return "STATUS_COMMITTING"; 655 case Status.STATUS_COMMITTED: 656 return "STATUS_COMMITTED"; 657 case Status.STATUS_ROLLEDBACK: 658 return "STATUS_ROLLEDBACK"; 659 case Status.STATUS_NO_TRANSACTION: 660 return "STATUS_NO_TRANSACTION"; 661 case Status.STATUS_UNKNOWN: 662 return "STATUS_UNKNOWN"; 663 default: 664 throw new AssertionError(); 665 } 666 } 667 668 public boolean equals(Object obj) { 669 if (obj instanceof TransactionImpl) { 670 TransactionImpl other = (TransactionImpl) obj; 671 return xid.equals(other.xid); 672 } else { 673 return false; 674 } 675 } 676 677 //when used from recovery, do not add manager to active or suspended resource maps. 678 // The xaresources have already been ended with TMSUCCESS. 679 public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) { 680 TransactionBranch manager = new TransactionBranch(xaRes, branchId); 681 resourceManagers.add(manager); 682 return manager; 683 } 684 685 public Object getEntityManager(String persistenceUnit) { 686 return entityManagers.get(persistenceUnit); 687 } 688 689 public void setEntityManager(String persistenceUnit, Object entityManager) { 690 Object oldEntityManager = entityManagers.put(persistenceUnit, entityManager); 691 if (oldEntityManager != null) { 692 throw new EJBException("EntityManager " + oldEntityManager + " for persistenceUnit " + persistenceUnit + " already associated with this transaction " + xid); 693 } 694 } 695 696 private static class TransactionBranch implements TransactionBranchInfo { 697 private final XAResource committer; 698 private final Xid branchId; 699 700 public TransactionBranch(XAResource xaRes, Xid branchId) { 701 committer = xaRes; 702 this.branchId = branchId; 703 } 704 705 public XAResource getCommitter() { 706 return committer; 707 } 708 709 public Xid getBranchId() { 710 return branchId; 711 } 712 713 public String getResourceName() { 714 if (committer instanceof NamedXAResource) { 715 return ((NamedXAResource) committer).getName(); 716 } else { 717 throw new IllegalStateException("Cannot log transactions unles XAResources are named! " + committer); 718 } 719 } 720 721 public Xid getBranchXid() { 722 return branchId; 723 } 724 } 725 726 727 }