1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.geronimo.transaction.manager;
19
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25
26 import javax.transaction.*;
27 import javax.transaction.xa.XAException;
28 import javax.transaction.xa.Xid;
29
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.CopyOnWriteArrayList;
32 import java.util.concurrent.atomic.AtomicLong;
33 import org.apache.geronimo.transaction.log.UnrecoverableLog;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37
38
39
40
41
42 public class TransactionManagerImpl implements TransactionManager, UserTransaction, TransactionSynchronizationRegistry, XidImporter, MonitorableTransactionManager, RecoverableTransactionManager {
43 private static final Logger log = LoggerFactory.getLogger(TransactionManagerImpl.class);
44 protected static final int DEFAULT_TIMEOUT = 600;
45 protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68};
46
47 final TransactionLog transactionLog;
48 final XidFactory xidFactory;
49 private final int defaultTransactionTimeoutMilliseconds;
50 private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal();
51 private final ThreadLocal threadTx = new ThreadLocal();
52 private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap();
53 private static final Logger recoveryLog = LoggerFactory.getLogger("RecoveryController");
54 final Recovery recovery;
55 private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList();
56 private List recoveryErrors = new ArrayList();
57
58 private AtomicLong totalCommits = new AtomicLong(0);
59 private AtomicLong totalRollBacks = new AtomicLong(0);
60 private AtomicLong activeCount = new AtomicLong(0);
61
62 public TransactionManagerImpl() throws XAException {
63 this(DEFAULT_TIMEOUT,
64 null,
65 null
66 );
67 }
68
69 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException {
70 this(defaultTransactionTimeoutSeconds,
71 null,
72 null
73 );
74 }
75
76 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
77 this(defaultTransactionTimeoutSeconds,
78 null,
79 transactionLog
80 );
81 }
82
83 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog) throws XAException {
84 if (defaultTransactionTimeoutSeconds <= 0) {
85 throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds);
86 }
87 this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000;
88
89 if (transactionLog == null) {
90 this.transactionLog = new UnrecoverableLog();
91 } else {
92 this.transactionLog = transactionLog;
93 }
94
95 if (xidFactory != null) {
96 this.xidFactory = xidFactory;
97 } else {
98 this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID);
99 }
100
101 recovery = new RecoveryImpl(this.transactionLog, this.xidFactory);
102 recovery.recoverLog();
103 }
104
105 public Transaction getTransaction() {
106 return (Transaction) threadTx.get();
107 }
108
109 private void associate(TransactionImpl tx) throws InvalidTransactionException {
110 if (tx.getStatus() == Status.STATUS_NO_TRANSACTION) {
111 throw new InvalidTransactionException("Cannot resume invalid transaction: " + tx);
112 } else {
113 Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread());
114 if (existingAssociation != null) {
115 throw new InvalidTransactionException("Specified transaction is already associated with another thread");
116 }
117 threadTx.set(tx);
118 fireThreadAssociated(tx);
119 activeCount.getAndIncrement();
120 }
121 }
122
123 private void unassociate() {
124 Transaction tx = getTransaction();
125 if (tx != null) {
126 associatedTransactions.remove(tx);
127 threadTx.set(null);
128 fireThreadUnassociated(tx);
129 activeCount.getAndDecrement();
130 }
131 }
132
133 public void setTransactionTimeout(int seconds) throws SystemException {
134 if (seconds < 0) {
135 throw new SystemException("transaction timeout must be positive or 0 to reset to default");
136 }
137 if (seconds == 0) {
138 transactionTimeoutMilliseconds.set(null);
139 } else {
140 transactionTimeoutMilliseconds.set(new Long(seconds * 1000));
141 }
142 }
143
144 public int getStatus() throws SystemException {
145 Transaction tx = getTransaction();
146 return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION;
147 }
148
149 public void begin() throws NotSupportedException, SystemException {
150 begin(getTransactionTimeoutMilliseconds(0L));
151 }
152
153 public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException {
154 if (getStatus() != Status.STATUS_NO_TRANSACTION) {
155 throw new NotSupportedException("Nested Transactions are not supported");
156 }
157 TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
158
159 try {
160 associate(tx);
161 } catch (InvalidTransactionException e) {
162
163 throw (SystemException)new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction").initCause(e);
164 }
165
166 this.transactionTimeoutMilliseconds.set(null);
167 return tx;
168 }
169
170 public Transaction suspend() throws SystemException {
171 Transaction tx = getTransaction();
172 if (tx != null) {
173 unassociate();
174 }
175 return tx;
176 }
177
178 public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException {
179 if (getTransaction() != null && tx != getTransaction()) {
180 throw new IllegalStateException("Thread already associated with another transaction");
181 }
182 if (tx != null && tx != getTransaction()) {
183 if (!(tx instanceof TransactionImpl)) {
184 throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx);
185 }
186
187 associate((TransactionImpl) tx);
188 }
189 }
190
191 public Object getResource(Object key) {
192 TransactionImpl tx = getActiveTransactionImpl();
193 return tx.getResource(key);
194 }
195
196 private TransactionImpl getActiveTransactionImpl() {
197 TransactionImpl tx = (TransactionImpl)threadTx.get();
198 if (tx == null) {
199 throw new IllegalStateException("No tx on thread");
200 }
201 if (tx.getStatus() != Status.STATUS_ACTIVE && tx.getStatus() != Status.STATUS_MARKED_ROLLBACK) {
202 throw new IllegalStateException("Transaction " + tx + " is not active");
203 }
204 return tx;
205 }
206
207 public boolean getRollbackOnly() {
208 TransactionImpl tx = getActiveTransactionImpl();
209 return tx.getRollbackOnly();
210 }
211
212 public Object getTransactionKey() {
213 TransactionImpl tx = (TransactionImpl) getTransaction();
214 return tx == null ? null: tx.getTransactionKey();
215 }
216
217 public int getTransactionStatus() {
218 TransactionImpl tx = (TransactionImpl) getTransaction();
219 return tx == null? Status.STATUS_NO_TRANSACTION: tx.getTransactionStatus();
220 }
221
222 public void putResource(Object key, Object value) {
223 TransactionImpl tx = getActiveTransactionImpl();
224 tx.putResource(key, value);
225 }
226
227
228
229
230
231 public void registerInterposedSynchronization(Synchronization synchronization) {
232 TransactionImpl tx = getActiveTransactionImpl();
233 tx.registerInterposedSynchronization(synchronization);
234 }
235
236 public void setRollbackOnly() throws IllegalStateException {
237 TransactionImpl tx = (TransactionImpl) threadTx.get();
238 if (tx == null) {
239 throw new IllegalStateException("No transaction associated with current thread");
240 }
241 tx.setRollbackOnly();
242 }
243
244 public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
245 Transaction tx = getTransaction();
246 if (tx == null) {
247 throw new IllegalStateException("No transaction associated with current thread");
248 }
249 try {
250 tx.commit();
251 } finally {
252 unassociate();
253 }
254 totalCommits.getAndIncrement();
255 }
256
257 public void rollback() throws IllegalStateException, SecurityException, SystemException {
258 Transaction tx = getTransaction();
259 if (tx == null) {
260 throw new IllegalStateException("No transaction associated with current thread");
261 }
262 try {
263 tx.rollback();
264 } finally {
265 unassociate();
266 }
267 totalRollBacks.getAndIncrement();
268 }
269
270
271 public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException {
272 if (transactionTimeoutMilliseconds < 0) {
273 throw new SystemException("transaction timeout must be positive or 0 to reset to default");
274 }
275 TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
276 return tx;
277 }
278
279 public void commit(Transaction tx, boolean onePhase) throws XAException {
280 if (onePhase) {
281 try {
282 tx.commit();
283 } catch (HeuristicMixedException e) {
284 throw (XAException) new XAException().initCause(e);
285 } catch (HeuristicRollbackException e) {
286 throw (XAException) new XAException().initCause(e);
287 } catch (RollbackException e) {
288 throw (XAException) new XAException().initCause(e);
289 } catch (SecurityException e) {
290 throw (XAException) new XAException().initCause(e);
291 } catch (SystemException e) {
292 throw (XAException) new XAException().initCause(e);
293 }
294 } else {
295 try {
296 ((TransactionImpl) tx).preparedCommit();
297 } catch (HeuristicMixedException e) {
298 throw (XAException) new XAException().initCause(e);
299 } catch (HeuristicRollbackException e) {
300 throw (XAException) new XAException().initCause(e);
301 } catch (SystemException e) {
302 throw (XAException) new XAException().initCause(e);
303 }
304 }
305 totalCommits.getAndIncrement();
306 }
307
308 public void forget(Transaction tx) throws XAException {
309
310 }
311
312 public int prepare(Transaction tx) throws XAException {
313 try {
314 return ((TransactionImpl) tx).prepare();
315 } catch (SystemException e) {
316 throw (XAException) new XAException().initCause(e);
317 } catch (RollbackException e) {
318 throw (XAException) new XAException().initCause(e);
319 }
320 }
321
322 public void rollback(Transaction tx) throws XAException {
323 try {
324 tx.rollback();
325 } catch (IllegalStateException e) {
326 throw (XAException) new XAException().initCause(e);
327 } catch (SystemException e) {
328 throw (XAException) new XAException().initCause(e);
329 }
330 totalRollBacks.getAndIncrement();
331 }
332
333 long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) {
334 if (transactionTimeoutMilliseconds != 0) {
335 return transactionTimeoutMilliseconds;
336 }
337 Long timeout = (Long) this.transactionTimeoutMilliseconds.get();
338 if (timeout != null) {
339 return timeout.longValue();
340 }
341 return defaultTransactionTimeoutMilliseconds;
342 }
343
344
345 public void recoveryError(Exception e) {
346 recoveryLog.error("Recovery error", e);
347 recoveryErrors.add(e);
348 }
349
350 public void recoverResourceManager(NamedXAResource xaResource) {
351 try {
352 recovery.recoverResourceManager(xaResource);
353 } catch (XAException e) {
354 recoveryError(e);
355 }
356 }
357
358 public Map getExternalXids() {
359 return new HashMap(recovery.getExternalXids());
360 }
361
362 public void addTransactionAssociationListener(TransactionManagerMonitor listener) {
363 transactionAssociationListeners.addIfAbsent(listener);
364 }
365
366 public void removeTransactionAssociationListener(TransactionManagerMonitor listener) {
367 transactionAssociationListeners.remove(listener);
368 }
369
370 protected void fireThreadAssociated(Transaction tx) {
371 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
372 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
373 try {
374 listener.threadAssociated(tx);
375 } catch (Exception e) {
376 log.warn("Error calling transaction association listener", e);
377 }
378 }
379 }
380
381 protected void fireThreadUnassociated(Transaction tx) {
382 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
383 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
384 try {
385 listener.threadUnassociated(tx);
386 } catch (Exception e) {
387 log.warn("Error calling transaction association listener", e);
388 }
389 }
390 }
391
392
393
394
395 public long getActiveCount() {
396 return activeCount.longValue();
397 }
398
399
400
401
402 public long getTotalCommits() {
403 return totalCommits.longValue();
404 }
405
406
407
408
409 public long getTotalRollbacks() {
410 return totalRollBacks.longValue();
411 }
412
413
414
415
416 public void resetStatistics() {
417 totalCommits.getAndSet(0);
418 totalRollBacks.getAndSet(0);
419 }
420 }