001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.geronimo.transaction.manager; 019 020 import java.util.ArrayList; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Map; 025 026 import javax.transaction.*; 027 import javax.transaction.xa.XAException; 028 import javax.transaction.xa.Xid; 029 030 import java.util.concurrent.ConcurrentHashMap; 031 import java.util.concurrent.CopyOnWriteArrayList; 032 import java.util.concurrent.atomic.AtomicLong; 033 import org.apache.geronimo.transaction.log.UnrecoverableLog; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 037 /** 038 * Simple implementation of a transaction manager. 039 * 040 * @version $Rev: 727065 $ $Date: 2008-12-16 10:22:22 -0500 (Tue, 16 Dec 2008) $ 041 */ 042 public class TransactionManagerImpl implements TransactionManager, UserTransaction, TransactionSynchronizationRegistry, XidImporter, MonitorableTransactionManager, RecoverableTransactionManager { 043 private static final Logger log = LoggerFactory.getLogger(TransactionManagerImpl.class); 044 protected static final int DEFAULT_TIMEOUT = 600; 045 protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68}; 046 047 final TransactionLog transactionLog; 048 final XidFactory xidFactory; 049 private final int defaultTransactionTimeoutMilliseconds; 050 private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal(); 051 private final ThreadLocal threadTx = new ThreadLocal(); 052 private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap(); 053 private static final Logger recoveryLog = LoggerFactory.getLogger("RecoveryController"); 054 final Recovery recovery; 055 private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList(); 056 private List recoveryErrors = new ArrayList(); 057 // statistics 058 private AtomicLong totalCommits = new AtomicLong(0); 059 private AtomicLong totalRollBacks = new AtomicLong(0); 060 private AtomicLong activeCount = new AtomicLong(0); 061 062 public TransactionManagerImpl() throws XAException { 063 this(DEFAULT_TIMEOUT, 064 null, 065 null 066 ); 067 } 068 069 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException { 070 this(defaultTransactionTimeoutSeconds, 071 null, 072 null 073 ); 074 } 075 076 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException { 077 this(defaultTransactionTimeoutSeconds, 078 null, 079 transactionLog 080 ); 081 } 082 083 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog) throws XAException { 084 if (defaultTransactionTimeoutSeconds <= 0) { 085 throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds); 086 } 087 this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000; 088 089 if (transactionLog == null) { 090 this.transactionLog = new UnrecoverableLog(); 091 } else { 092 this.transactionLog = transactionLog; 093 } 094 095 if (xidFactory != null) { 096 this.xidFactory = xidFactory; 097 } else { 098 this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID); 099 } 100 101 recovery = new RecoveryImpl(this.transactionLog, this.xidFactory); 102 recovery.recoverLog(); 103 } 104 105 public Transaction getTransaction() { 106 return (Transaction) threadTx.get(); 107 } 108 109 private void associate(TransactionImpl tx) throws InvalidTransactionException { 110 if (tx.getStatus() == Status.STATUS_NO_TRANSACTION) { 111 throw new InvalidTransactionException("Cannot resume invalid transaction: " + tx); 112 } else { 113 Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread()); 114 if (existingAssociation != null) { 115 throw new InvalidTransactionException("Specified transaction is already associated with another thread"); 116 } 117 threadTx.set(tx); 118 fireThreadAssociated(tx); 119 activeCount.getAndIncrement(); 120 } 121 } 122 123 private void unassociate() { 124 Transaction tx = getTransaction(); 125 if (tx != null) { 126 associatedTransactions.remove(tx); 127 threadTx.set(null); 128 fireThreadUnassociated(tx); 129 activeCount.getAndDecrement(); 130 } 131 } 132 133 public void setTransactionTimeout(int seconds) throws SystemException { 134 if (seconds < 0) { 135 throw new SystemException("transaction timeout must be positive or 0 to reset to default"); 136 } 137 if (seconds == 0) { 138 transactionTimeoutMilliseconds.set(null); 139 } else { 140 transactionTimeoutMilliseconds.set(new Long(seconds * 1000)); 141 } 142 } 143 144 public int getStatus() throws SystemException { 145 Transaction tx = getTransaction(); 146 return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION; 147 } 148 149 public void begin() throws NotSupportedException, SystemException { 150 begin(getTransactionTimeoutMilliseconds(0L)); 151 } 152 153 public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException { 154 if (getStatus() != Status.STATUS_NO_TRANSACTION) { 155 throw new NotSupportedException("Nested Transactions are not supported"); 156 } 157 TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds)); 158 // timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds)); 159 try { 160 associate(tx); 161 } catch (InvalidTransactionException e) { 162 // should not be possible since we just created that transaction and no one has a reference yet 163 throw (SystemException)new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction").initCause(e); 164 } 165 // Todo: Verify if this is correct thing to do. Use default timeout for next transaction. 166 this.transactionTimeoutMilliseconds.set(null); 167 return tx; 168 } 169 170 public Transaction suspend() throws SystemException { 171 Transaction tx = getTransaction(); 172 if (tx != null) { 173 unassociate(); 174 } 175 return tx; 176 } 177 178 public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException { 179 if (getTransaction() != null && tx != getTransaction()) { 180 throw new IllegalStateException("Thread already associated with another transaction"); 181 } 182 if (tx != null && tx != getTransaction()) { 183 if (!(tx instanceof TransactionImpl)) { 184 throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx); 185 } 186 187 associate((TransactionImpl) tx); 188 } 189 } 190 191 public Object getResource(Object key) { 192 TransactionImpl tx = getActiveTransactionImpl(); 193 return tx.getResource(key); 194 } 195 196 private TransactionImpl getActiveTransactionImpl() { 197 TransactionImpl tx = (TransactionImpl)threadTx.get(); 198 if (tx == null) { 199 throw new IllegalStateException("No tx on thread"); 200 } 201 if (tx.getStatus() != Status.STATUS_ACTIVE && tx.getStatus() != Status.STATUS_MARKED_ROLLBACK) { 202 throw new IllegalStateException("Transaction " + tx + " is not active"); 203 } 204 return tx; 205 } 206 207 public boolean getRollbackOnly() { 208 TransactionImpl tx = getActiveTransactionImpl(); 209 return tx.getRollbackOnly(); 210 } 211 212 public Object getTransactionKey() { 213 TransactionImpl tx = (TransactionImpl) getTransaction(); 214 return tx == null ? null: tx.getTransactionKey(); 215 } 216 217 public int getTransactionStatus() { 218 TransactionImpl tx = (TransactionImpl) getTransaction(); 219 return tx == null? Status.STATUS_NO_TRANSACTION: tx.getTransactionStatus(); 220 } 221 222 public void putResource(Object key, Object value) { 223 TransactionImpl tx = getActiveTransactionImpl(); 224 tx.putResource(key, value); 225 } 226 227 /** 228 * jta 1.1 method so the jpa implementations can be told to flush their caches. 229 * @param synchronization 230 */ 231 public void registerInterposedSynchronization(Synchronization synchronization) { 232 TransactionImpl tx = getActiveTransactionImpl(); 233 tx.registerInterposedSynchronization(synchronization); 234 } 235 236 public void setRollbackOnly() throws IllegalStateException { 237 TransactionImpl tx = (TransactionImpl) threadTx.get(); 238 if (tx == null) { 239 throw new IllegalStateException("No transaction associated with current thread"); 240 } 241 tx.setRollbackOnly(); 242 } 243 244 public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException { 245 Transaction tx = getTransaction(); 246 if (tx == null) { 247 throw new IllegalStateException("No transaction associated with current thread"); 248 } 249 try { 250 tx.commit(); 251 } finally { 252 unassociate(); 253 } 254 totalCommits.getAndIncrement(); 255 } 256 257 public void rollback() throws IllegalStateException, SecurityException, SystemException { 258 Transaction tx = getTransaction(); 259 if (tx == null) { 260 throw new IllegalStateException("No transaction associated with current thread"); 261 } 262 try { 263 tx.rollback(); 264 } finally { 265 unassociate(); 266 } 267 totalRollBacks.getAndIncrement(); 268 } 269 270 //XidImporter implementation 271 public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException { 272 if (transactionTimeoutMilliseconds < 0) { 273 throw new SystemException("transaction timeout must be positive or 0 to reset to default"); 274 } 275 TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds)); 276 return tx; 277 } 278 279 public void commit(Transaction tx, boolean onePhase) throws XAException { 280 if (onePhase) { 281 try { 282 tx.commit(); 283 } catch (HeuristicMixedException e) { 284 throw (XAException) new XAException().initCause(e); 285 } catch (HeuristicRollbackException e) { 286 throw (XAException) new XAException().initCause(e); 287 } catch (RollbackException e) { 288 throw (XAException) new XAException().initCause(e); 289 } catch (SecurityException e) { 290 throw (XAException) new XAException().initCause(e); 291 } catch (SystemException e) { 292 throw (XAException) new XAException().initCause(e); 293 } 294 } else { 295 try { 296 ((TransactionImpl) tx).preparedCommit(); 297 } catch (HeuristicMixedException e) { 298 throw (XAException) new XAException().initCause(e); 299 } catch (HeuristicRollbackException e) { 300 throw (XAException) new XAException().initCause(e); 301 } catch (SystemException e) { 302 throw (XAException) new XAException().initCause(e); 303 } 304 } 305 totalCommits.getAndIncrement(); 306 } 307 308 public void forget(Transaction tx) throws XAException { 309 //TODO implement this! 310 } 311 312 public int prepare(Transaction tx) throws XAException { 313 try { 314 return ((TransactionImpl) tx).prepare(); 315 } catch (SystemException e) { 316 throw (XAException) new XAException().initCause(e); 317 } catch (RollbackException e) { 318 throw (XAException) new XAException().initCause(e); 319 } 320 } 321 322 public void rollback(Transaction tx) throws XAException { 323 try { 324 tx.rollback(); 325 } catch (IllegalStateException e) { 326 throw (XAException) new XAException().initCause(e); 327 } catch (SystemException e) { 328 throw (XAException) new XAException().initCause(e); 329 } 330 totalRollBacks.getAndIncrement(); 331 } 332 333 long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) { 334 if (transactionTimeoutMilliseconds != 0) { 335 return transactionTimeoutMilliseconds; 336 } 337 Long timeout = (Long) this.transactionTimeoutMilliseconds.get(); 338 if (timeout != null) { 339 return timeout.longValue(); 340 } 341 return defaultTransactionTimeoutMilliseconds; 342 } 343 344 //Recovery 345 public void recoveryError(Exception e) { 346 recoveryLog.error("Recovery error", e); 347 recoveryErrors.add(e); 348 } 349 350 public void recoverResourceManager(NamedXAResource xaResource) { 351 try { 352 recovery.recoverResourceManager(xaResource); 353 } catch (XAException e) { 354 recoveryError(e); 355 } 356 } 357 358 public Map getExternalXids() { 359 return new HashMap(recovery.getExternalXids()); 360 } 361 362 public void addTransactionAssociationListener(TransactionManagerMonitor listener) { 363 transactionAssociationListeners.addIfAbsent(listener); 364 } 365 366 public void removeTransactionAssociationListener(TransactionManagerMonitor listener) { 367 transactionAssociationListeners.remove(listener); 368 } 369 370 protected void fireThreadAssociated(Transaction tx) { 371 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) { 372 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next(); 373 try { 374 listener.threadAssociated(tx); 375 } catch (Exception e) { 376 log.warn("Error calling transaction association listener", e); 377 } 378 } 379 } 380 381 protected void fireThreadUnassociated(Transaction tx) { 382 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) { 383 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next(); 384 try { 385 listener.threadUnassociated(tx); 386 } catch (Exception e) { 387 log.warn("Error calling transaction association listener", e); 388 } 389 } 390 } 391 392 /** 393 * Returns the number of active transactions. 394 */ 395 public long getActiveCount() { 396 return activeCount.longValue(); 397 } 398 399 /** 400 * Return the number of total commits 401 */ 402 public long getTotalCommits() { 403 return totalCommits.longValue(); 404 } 405 406 /** 407 * Returns the number of total rollbacks 408 */ 409 public long getTotalRollbacks() { 410 return totalRollBacks.longValue(); 411 } 412 413 /** 414 * Reset statistics 415 */ 416 public void resetStatistics() { 417 totalCommits.getAndSet(0); 418 totalRollBacks.getAndSet(0); 419 } 420 }