1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one or more
4 * contributor license agreements. See the NOTICE file distributed with
5 * this work for additional information regarding copyright ownership.
6 * The ASF licenses this file to You under the Apache License, Version 2.0
7 * (the "License"); you may not use this file except in compliance with
8 * the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.geronimo.timer;
20
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.Collections;
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.Map;
28 import java.util.Timer;
29 import java.util.TimerTask;
30 import javax.transaction.RollbackException;
31 import javax.transaction.Status;
32 import javax.transaction.Synchronization;
33 import javax.transaction.SystemException;
34 import javax.transaction.Transaction;
35 import javax.transaction.TransactionManager;
36
37 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.geronimo.gbean.GBeanLifecycle;
42
43 /**
44 *
45 *
46 * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
47 *
48 * */
49 public class ThreadPooledTimer implements PersistentTimer, GBeanLifecycle {
50
51 private static final Log log = LogFactory.getLog(ThreadPooledTimer.class);
52
53 private final ExecutorTaskFactory executorTaskFactory;
54 private final WorkerPersistence workerPersistence;
55 private final Executor executor;
56 private final TransactionManager transactionManager;
57
58 private Timer delegate;
59
60 private final Map idToWorkInfoMap = Collections.synchronizedMap(new HashMap());
61
62
63 public ThreadPooledTimer() {
64 this(null, null, null, null);
65 }
66
67 public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory, WorkerPersistence workerPersistence, Executor executor, TransactionManager transactionManager) {
68 this.executorTaskFactory = executorTaskFactory;
69 this.workerPersistence = workerPersistence;
70 this.executor = executor;
71 this.transactionManager = transactionManager;
72 }
73
74 public void doStart() throws Exception {
75 delegate = new Timer(true);
76 }
77
78 public void doStop() {
79 if (delegate != null) {
80 delegate.cancel();
81 delegate = null;
82 }
83 }
84
85 public void doFail() {
86 doStop();
87 }
88
89 public WorkInfo schedule(UserTaskFactory userTaskFactory, String key, Object userId, Object userInfo, long delay) throws PersistenceException, RollbackException, SystemException {
90 if (delay < 0) {
91 throw new IllegalArgumentException("Negative delay: " + delay);
92 }
93 Date time = new Date(System.currentTimeMillis() + delay);
94 return schedule(key, userTaskFactory, userId, userInfo, time);
95 }
96
97 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date time) throws PersistenceException, RollbackException, SystemException {
98 if (time ==null) {
99 throw new IllegalArgumentException("No time supplied");
100 }
101 if (time.getTime() < 0) {
102 throw new IllegalArgumentException("Negative time: " + time.getTime());
103 }
104 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, time, null, false);
105 registerSynchronization(new ScheduleSynchronization(worker.getExecutorFeedingTimerTask(), time));
106 addWorkInfo(worker);
107 return worker;
108 }
109
110 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period, Object userId) throws PersistenceException, RollbackException, SystemException {
111 if (delay < 0) {
112 throw new IllegalArgumentException("Negative delay: " + delay);
113 }
114 if (period < 0) {
115 throw new IllegalArgumentException("Negative period: " + period);
116 }
117 Date time = new Date(System.currentTimeMillis() + delay);
118 return schedule(key, userTaskFactory, userId, userInfo, time, period);
119 }
120
121 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
122 if (firstTime ==null) {
123 throw new IllegalArgumentException("No time supplied");
124 }
125 if (firstTime.getTime() < 0) {
126 throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
127 }
128 if (period < 0) {
129 throw new IllegalArgumentException("Negative period: " + period);
130 }
131 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), false);
132 registerSynchronization(new ScheduleRepeatedSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
133 addWorkInfo(worker);
134 return worker;
135 }
136
137 public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException {
138 if (delay < 0) {
139 throw new IllegalArgumentException("Negative delay: " + delay);
140 }
141 if (period < 0) {
142 throw new IllegalArgumentException("Negative period: " + period);
143 }
144 Date time = new Date(System.currentTimeMillis() + delay);
145 return scheduleAtFixedRate(key, userTaskFactory, userId, userInfo, time, period);
146 }
147
148 public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
149 if (firstTime ==null) {
150 throw new IllegalArgumentException("No time supplied");
151 }
152 if (firstTime.getTime() < 0) {
153 throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
154 }
155 if (period < 0) {
156 throw new IllegalArgumentException("Negative period: " + period);
157 }
158 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), true);
159 registerSynchronization(new ScheduleAtFixedRateSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
160 addWorkInfo(worker);
161 return worker;
162 }
163
164 public Collection playback(String key, UserTaskFactory userTaskFactory) throws PersistenceException {
165 PlaybackImpl playback = new PlaybackImpl(userTaskFactory);
166 workerPersistence.playback(key, playback);
167 return playback.getWorkInfos();
168 }
169
170 public Collection getIdsByKey(String key, Object userId) throws PersistenceException {
171 return workerPersistence.getIdsByKey(key, userId);
172 }
173
174 public WorkInfo getWorkInfo(Long id) {
175 return (WorkInfo) idToWorkInfoMap.get(id);
176 }
177
178 /**
179 * Called when client, eg. ejb container, is stopped and needs to cancel its timertasks without
180 * affecting persisted timer data.
181 * @param ids list of ids to have their corresponding workInfo timertasks cancelled.
182 */
183 public void cancelTimerTasks(Collection ids) {
184 for (Iterator iterator = ids.iterator(); iterator.hasNext();) {
185 Long idLong = (Long) iterator.next();
186 WorkInfo workInfo = getWorkInfo(idLong);
187 if (workInfo != null) {
188 TimerTask timerTask = workInfo.getExecutorFeedingTimerTask();
189 timerTask.cancel();
190 }
191 }
192 }
193
194 void addWorkInfo(WorkInfo worker) {
195 idToWorkInfoMap.put(new Long(worker.getId()), worker);
196 }
197
198 void removeWorkInfo(WorkInfo workInfo) {
199 idToWorkInfoMap.remove(new Long(workInfo.getId()));
200 }
201
202 void workPerformed(WorkInfo workInfo) throws PersistenceException {
203 if (workInfo.isOneTime()) {
204 workerPersistence.cancel(workInfo.getId());
205 } else if (workInfo.getAtFixedRate()) {
206 workerPersistence.fixedRateWorkPerformed(workInfo.getId());
207 workInfo.nextTime();
208 } else {
209 workInfo.nextInterval();
210 workerPersistence.intervalWorkPerformed(workInfo.getId(), workInfo.getPeriod().longValue());
211 }
212 }
213
214 Timer getTimer() {
215 if (delegate == null) {
216 throw new IllegalStateException("Timer is stopped");
217 }
218 return delegate;
219 }
220
221 WorkerPersistence getWorkerPersistence() {
222 return workerPersistence;
223 }
224
225 Executor getExecutor() {
226 return executor;
227 }
228
229 private WorkInfo createWorker(String key, UserTaskFactory userTaskFactory, ExecutorTaskFactory executorTaskFactory, Object userId, Object userInfo, Date time, Long period, boolean atFixedRate) throws PersistenceException {
230 if (time == null) {
231 throw new IllegalArgumentException("Null initial time");
232 }
233 WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
234
235 workerPersistence.save(workInfo);
236
237 Runnable userTask = userTaskFactory.newTask(workInfo.getId());
238 ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, this);
239 ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, this);
240 workInfo.initialize(worker, executorTask);
241 return workInfo;
242 }
243
244 void registerSynchronization(Synchronization sync) throws RollbackException, SystemException {
245 Transaction transaction = transactionManager.getTransaction();
246 int status = transaction == null ? Status.STATUS_NO_TRANSACTION : transaction.getStatus();
247
248 if (transaction != null && status == Status.STATUS_ACTIVE || status == Status.STATUS_MARKED_ROLLBACK) {
249 transaction.registerSynchronization(sync);
250 } else {
251 sync.beforeCompletion();
252 sync.afterCompletion(Status.STATUS_COMMITTED);
253 }
254 }
255
256 private class ScheduleSynchronization implements Synchronization {
257
258 private final ExecutorFeedingTimerTask worker;
259 private final Date time;
260
261 public ScheduleSynchronization(ExecutorFeedingTimerTask worker, Date time) {
262 this.worker = worker;
263 this.time = time;
264 }
265
266 public void beforeCompletion() {
267 }
268
269 public void afterCompletion(int status) {
270 if (status == Status.STATUS_COMMITTED) {
271 if (worker.isCancelled()) {
272 log.trace("Worker is already cancelled, not scheduling");
273 return;
274 }
275 try {
276 getTimer().schedule(worker, time);
277 } catch (IllegalStateException e) {
278
279 log.warn("Couldn't schedule worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
280 }
281 }
282 }
283 }
284
285 private class ScheduleRepeatedSynchronization implements Synchronization {
286
287 private final ExecutorFeedingTimerTask worker;
288 private final Date time;
289 private final long period;
290
291 public ScheduleRepeatedSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
292 this.worker = worker;
293 this.time = time;
294 this.period = period;
295 }
296
297 public void beforeCompletion() {
298 }
299
300 public void afterCompletion(int status) {
301 if (status == Status.STATUS_COMMITTED) {
302 if (worker.isCancelled()) {
303 log.trace("Worker is already cancelled, not scheduling/period");
304 return;
305 }
306 try {
307 getTimer().schedule(worker, time, period);
308 } catch (Exception e) {
309 log.warn("Couldn't schedule/period worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
310 }
311 }
312 }
313 }
314
315 private class ScheduleAtFixedRateSynchronization implements Synchronization {
316
317 private final ExecutorFeedingTimerTask worker;
318 private final Date time;
319 private final long period;
320
321 public ScheduleAtFixedRateSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
322 this.worker = worker;
323 this.time = time;
324 this.period = period;
325 }
326
327 public void beforeCompletion() {
328 }
329
330 public void afterCompletion(int status) {
331 if (status == Status.STATUS_COMMITTED) {
332 if (worker.isCancelled()) {
333 log.trace("Worker is already cancelled, not scheduleAtFixedRate");
334 return;
335 }
336 try {
337 getTimer().scheduleAtFixedRate(worker, time, period);
338 } catch (Exception e) {
339 log.warn("Couldn't scheduleAtFixedRate worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
340 }
341 }
342 }
343 }
344
345 private class PlaybackImpl implements Playback {
346
347 private final UserTaskFactory userTaskFactory;
348
349 private final Collection workInfos = new ArrayList();
350
351 public PlaybackImpl(UserTaskFactory userTaskFactory) {
352 this.userTaskFactory = userTaskFactory;
353 }
354
355 public void schedule(WorkInfo workInfo) {
356 Runnable userTask = userTaskFactory.newTask(workInfo.getId());
357 ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, ThreadPooledTimer.this);
358 ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, ThreadPooledTimer.this);
359 workInfo.initialize(worker, executorTask);
360 if (workInfo.getPeriod() == null) {
361 getTimer().schedule(worker, workInfo.getTime());
362 } else if (!workInfo.getAtFixedRate()) {
363 getTimer().schedule(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
364 } else {
365 getTimer().scheduleAtFixedRate(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
366 }
367 addWorkInfo(workInfo);
368 workInfos.add(workInfo);
369 }
370
371 public Collection getWorkInfos() {
372 return workInfos;
373 }
374
375 }
376
377 }