001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.geronimo.transaction.manager; 020 021 import java.util.ArrayList; 022 import java.util.Collection; 023 import java.util.HashMap; 024 import java.util.Iterator; 025 import java.util.List; 026 import java.util.Map; 027 028 import javax.transaction.HeuristicMixedException; 029 import javax.transaction.HeuristicRollbackException; 030 import javax.transaction.InvalidTransactionException; 031 import javax.transaction.NotSupportedException; 032 import javax.transaction.RollbackException; 033 import javax.transaction.Status; 034 import javax.transaction.Synchronization; 035 import javax.transaction.SystemException; 036 import javax.transaction.Transaction; 037 import javax.transaction.TransactionManager; 038 import javax.transaction.UserTransaction; 039 import javax.transaction.xa.XAException; 040 import javax.transaction.xa.Xid; 041 042 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; 043 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; 044 import org.apache.commons.logging.Log; 045 import org.apache.commons.logging.LogFactory; 046 import org.apache.geronimo.transaction.log.UnrecoverableLog; 047 048 /** 049 * Simple implementation of a transaction manager. 050 * 051 * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $ 052 */ 053 public class TransactionManagerImpl implements TransactionManager, UserTransaction, XidImporter, MonitorableTransactionManager { 054 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class); 055 protected static final int DEFAULT_TIMEOUT = 600; 056 protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68}; 057 058 final TransactionLog transactionLog; 059 final XidFactory xidFactory; 060 private final int defaultTransactionTimeoutMilliseconds; 061 private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal(); 062 private final ThreadLocal threadTx = new ThreadLocal(); 063 private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap(); 064 private static final Log recoveryLog = LogFactory.getLog("RecoveryController"); 065 final Recovery recovery; 066 final Collection resourceManagers; 067 private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList(); 068 private List recoveryErrors = new ArrayList(); 069 070 public TransactionManagerImpl() throws XAException { 071 this(DEFAULT_TIMEOUT, 072 null, 073 null, 074 null); 075 } 076 077 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException { 078 this(defaultTransactionTimeoutSeconds, 079 null, 080 null, 081 null); 082 } 083 084 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException { 085 this(defaultTransactionTimeoutSeconds, 086 null, 087 transactionLog, 088 null); 089 } 090 091 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog, Collection resourceManagers) throws XAException { 092 if (defaultTransactionTimeoutSeconds <= 0) { 093 throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds); 094 } 095 096 this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000; 097 098 if (transactionLog == null) { 099 this.transactionLog = new UnrecoverableLog(); 100 } else { 101 this.transactionLog = transactionLog; 102 } 103 104 if (xidFactory != null) { 105 this.xidFactory = xidFactory; 106 } else { 107 this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID); 108 } 109 110 this.resourceManagers = resourceManagers; 111 recovery = new RecoveryImpl(this.transactionLog, this.xidFactory); 112 113 if (resourceManagers != null) { 114 recovery.recoverLog(); 115 List copy = watchResourceManagers(resourceManagers); 116 for (Iterator iterator = copy.iterator(); iterator.hasNext();) { 117 ResourceManager resourceManager = (ResourceManager) iterator.next(); 118 recoverResourceManager(resourceManager); 119 } 120 } 121 } 122 123 protected List watchResourceManagers(Collection resourceManagers) { 124 return new ArrayList(resourceManagers); 125 } 126 127 public Transaction getTransaction() { 128 return (Transaction) threadTx.get(); 129 } 130 131 private void associate(TransactionImpl tx) throws InvalidTransactionException { 132 if (tx == null) throw new NullPointerException("tx is null"); 133 134 Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread()); 135 if (existingAssociation != null) { 136 throw new InvalidTransactionException("Specified transaction is already associated with another thread"); 137 } 138 threadTx.set(tx); 139 fireThreadAssociated(tx); 140 } 141 142 private void unassociate() { 143 Transaction tx = getTransaction(); 144 if (tx != null) { 145 associatedTransactions.remove(tx); 146 threadTx.set(null); 147 fireThreadUnassociated(tx); 148 } 149 } 150 151 public void setTransactionTimeout(int seconds) throws SystemException { 152 if (seconds < 0) { 153 throw new SystemException("transaction timeout must be positive or 0 to reset to default"); 154 } 155 if (seconds == 0) { 156 transactionTimeoutMilliseconds.set(null); 157 } else { 158 transactionTimeoutMilliseconds.set(new Long(seconds * 1000)); 159 } 160 } 161 162 public int getStatus() throws SystemException { 163 Transaction tx = getTransaction(); 164 return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION; 165 } 166 167 public void begin() throws NotSupportedException, SystemException { 168 begin(getTransactionTimeoutMilliseconds(0L)); 169 } 170 171 public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException { 172 if (getStatus() != Status.STATUS_NO_TRANSACTION) { 173 throw new NotSupportedException("Nested Transactions are not supported"); 174 } 175 TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds)); 176 // timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds)); 177 try { 178 associate(tx); 179 } catch (InvalidTransactionException e) { 180 // should not be possible since we just created that transaction and no one has a reference yet 181 throw new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction"); 182 } 183 // Todo: Verify if this is correct thing to do. Use default timeout for next transaction. 184 this.transactionTimeoutMilliseconds.set(null); 185 return tx; 186 } 187 188 public Transaction suspend() throws SystemException { 189 Transaction tx = getTransaction(); 190 if (tx != null) { 191 unassociate(); 192 } 193 return tx; 194 } 195 196 public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException { 197 if (getTransaction() != null) { 198 throw new IllegalStateException("Thread already associated with another transaction"); 199 } 200 if (!(tx instanceof TransactionImpl)) { 201 throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx); 202 } 203 associate((TransactionImpl) tx); 204 } 205 206 public Object getResource(Object key) { 207 TransactionImpl tx = getActiveTransactionImpl(); 208 return tx.getResource(key); 209 } 210 211 private TransactionImpl getActiveTransactionImpl() { 212 TransactionImpl tx = (TransactionImpl)threadTx.get(); 213 if (tx == null) { 214 throw new IllegalStateException("No tx on thread"); 215 } 216 if (tx.getStatus() != Status.STATUS_ACTIVE) { 217 throw new IllegalStateException("Transaction " + tx + " is not active"); 218 } 219 return tx; 220 } 221 222 public boolean getRollbackOnly() { 223 TransactionImpl tx = getActiveTransactionImpl(); 224 return tx.getRollbackOnly(); 225 } 226 227 public Object getTransactionKey() { 228 TransactionImpl tx = getActiveTransactionImpl(); 229 return tx.getTransactionKey(); 230 } 231 232 public int getTransactionStatus() { 233 TransactionImpl tx = getActiveTransactionImpl(); 234 return tx.getTransactionStatus(); 235 } 236 237 public void putResource(Object key, Object value) { 238 TransactionImpl tx = getActiveTransactionImpl(); 239 tx.putResource(key, value); 240 } 241 242 /** 243 * jta 1.1 method so the jpa implementations can be told to flush their caches. 244 * @param synchronization 245 */ 246 public void registerInterposedSynchronization(Synchronization synchronization) { 247 TransactionImpl tx = getActiveTransactionImpl(); 248 tx.registerInterposedSynchronization(synchronization); 249 } 250 251 public void setRollbackOnly() throws IllegalStateException { 252 TransactionImpl tx = (TransactionImpl) threadTx.get(); 253 if (tx == null) { 254 throw new IllegalStateException("No transaction associated with current thread"); 255 } 256 tx.setRollbackOnly(); 257 } 258 259 public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException { 260 Transaction tx = getTransaction(); 261 if (tx == null) { 262 throw new IllegalStateException("No transaction associated with current thread"); 263 } 264 try { 265 tx.commit(); 266 } finally { 267 unassociate(); 268 } 269 } 270 271 public void rollback() throws IllegalStateException, SecurityException, SystemException { 272 Transaction tx = getTransaction(); 273 if (tx == null) { 274 throw new IllegalStateException("No transaction associated with current thread"); 275 } 276 try { 277 tx.rollback(); 278 } finally { 279 unassociate(); 280 } 281 } 282 283 //XidImporter implementation 284 public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException { 285 if (transactionTimeoutMilliseconds < 0) { 286 throw new SystemException("transaction timeout must be positive or 0 to reset to default"); 287 } 288 TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds)); 289 return tx; 290 } 291 292 public void commit(Transaction tx, boolean onePhase) throws XAException { 293 if (onePhase) { 294 try { 295 tx.commit(); 296 } catch (HeuristicMixedException e) { 297 throw (XAException) new XAException().initCause(e); 298 } catch (HeuristicRollbackException e) { 299 throw (XAException) new XAException().initCause(e); 300 } catch (RollbackException e) { 301 throw (XAException) new XAException().initCause(e); 302 } catch (SecurityException e) { 303 throw (XAException) new XAException().initCause(e); 304 } catch (SystemException e) { 305 throw (XAException) new XAException().initCause(e); 306 } 307 } else { 308 try { 309 ((TransactionImpl) tx).preparedCommit(); 310 } catch (SystemException e) { 311 throw (XAException) new XAException().initCause(e); 312 } 313 } 314 } 315 316 public void forget(Transaction tx) throws XAException { 317 //TODO implement this! 318 } 319 320 public int prepare(Transaction tx) throws XAException { 321 try { 322 return ((TransactionImpl) tx).prepare(); 323 } catch (SystemException e) { 324 throw (XAException) new XAException().initCause(e); 325 } catch (RollbackException e) { 326 throw (XAException) new XAException().initCause(e); 327 } 328 } 329 330 public void rollback(Transaction tx) throws XAException { 331 try { 332 tx.rollback(); 333 } catch (IllegalStateException e) { 334 throw (XAException) new XAException().initCause(e); 335 } catch (SystemException e) { 336 throw (XAException) new XAException().initCause(e); 337 } 338 } 339 340 long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) { 341 if (transactionTimeoutMilliseconds != 0) { 342 return transactionTimeoutMilliseconds; 343 } 344 Long timeout = (Long) this.transactionTimeoutMilliseconds.get(); 345 if (timeout != null) { 346 return timeout.longValue(); 347 } 348 return defaultTransactionTimeoutMilliseconds; 349 } 350 351 protected void recoverResourceManager(ResourceManager resourceManager) { 352 NamedXAResource namedXAResource; 353 try { 354 namedXAResource = resourceManager.getRecoveryXAResources(); 355 } catch (SystemException e) { 356 recoveryLog.error(e); 357 recoveryErrors.add(e); 358 return; 359 } 360 if (namedXAResource != null) { 361 try { 362 recovery.recoverResourceManager(namedXAResource); 363 } catch (XAException e) { 364 recoveryLog.error(e); 365 recoveryErrors.add(e); 366 } finally { 367 resourceManager.returnResource(namedXAResource); 368 } 369 } 370 } 371 372 373 public Map getExternalXids() { 374 return new HashMap(recovery.getExternalXids()); 375 } 376 377 public void addTransactionAssociationListener(TransactionManagerMonitor listener) { 378 transactionAssociationListeners.addIfAbsent(listener); 379 } 380 381 public void removeTransactionAssociationListener(TransactionManagerMonitor listener) { 382 transactionAssociationListeners.remove(listener); 383 } 384 385 protected void fireThreadAssociated(Transaction tx) { 386 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) { 387 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next(); 388 try { 389 listener.threadAssociated(tx); 390 } catch (Exception e) { 391 log.warn("Error calling transaction association listener", e); 392 } 393 } 394 } 395 396 protected void fireThreadUnassociated(Transaction tx) { 397 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) { 398 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next(); 399 try { 400 listener.threadUnassociated(tx); 401 } catch (Exception e) { 402 log.warn("Error calling transaction association listener", e); 403 } 404 } 405 } 406 }