001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.geronimo.timer;
019
020 import java.util.ArrayList;
021 import java.util.Collection;
022 import java.util.Collections;
023 import java.util.Date;
024 import java.util.HashMap;
025 import java.util.Iterator;
026 import java.util.Map;
027 import java.util.Timer;
028 import java.util.TimerTask;
029 import javax.transaction.RollbackException;
030 import javax.transaction.Status;
031 import javax.transaction.Synchronization;
032 import javax.transaction.SystemException;
033 import javax.transaction.Transaction;
034 import javax.transaction.TransactionManager;
035
036 import java.util.concurrent.Executor;
037
038 import org.apache.commons.logging.Log;
039 import org.apache.commons.logging.LogFactory;
040 import org.apache.geronimo.gbean.GBeanLifecycle;
041
042 /**
043 *
044 *
045 * @version $Rev: 520573 $ $Date: 2007-03-20 16:48:26 -0400 (Tue, 20 Mar 2007) $
046 *
047 * */
048 public class ThreadPooledTimer implements PersistentTimer, GBeanLifecycle {
049
050 private static final Log log = LogFactory.getLog(ThreadPooledTimer.class);
051
052 private final ExecutorTaskFactory executorTaskFactory;
053 private final WorkerPersistence workerPersistence;
054 private final Executor executor;
055 private final TransactionManager transactionManager;
056
057 private Timer delegate;
058
059 private final Map idToWorkInfoMap = Collections.synchronizedMap(new HashMap());
060
061 //default constructor for use as reference endpoint.
062 public ThreadPooledTimer() {
063 this(null, null, null, null);
064 }
065
066 public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory, WorkerPersistence workerPersistence, Executor executor, TransactionManager transactionManager) {
067 this.executorTaskFactory = executorTaskFactory;
068 this.workerPersistence = workerPersistence;
069 this.executor = executor;
070 this.transactionManager = transactionManager;
071 }
072
073 public void doStart() throws Exception {
074 delegate = new Timer(true);
075 }
076
077 public void doStop() {
078 if (delegate != null) {
079 delegate.cancel();
080 delegate = null;
081 }
082 }
083
084 public void doFail() {
085 doStop();
086 }
087
088 public WorkInfo schedule(UserTaskFactory userTaskFactory, String key, Object userId, Object userInfo, long delay) throws PersistenceException, RollbackException, SystemException {
089 if (delay < 0) {
090 throw new IllegalArgumentException("Negative delay: " + delay);
091 }
092 Date time = new Date(System.currentTimeMillis() + delay);
093 return schedule(key, userTaskFactory, userId, userInfo, time);
094 }
095
096 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date time) throws PersistenceException, RollbackException, SystemException {
097 if (time ==null) {
098 throw new IllegalArgumentException("No time supplied");
099 }
100 if (time.getTime() < 0) {
101 throw new IllegalArgumentException("Negative time: " + time.getTime());
102 }
103 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, time, null, false);
104 registerSynchronization(new ScheduleSynchronization(worker.getExecutorFeedingTimerTask(), time));
105 addWorkInfo(worker);
106 return worker;
107 }
108
109 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period, Object userId) throws PersistenceException, RollbackException, SystemException {
110 if (delay < 0) {
111 throw new IllegalArgumentException("Negative delay: " + delay);
112 }
113 if (period < 0) {
114 throw new IllegalArgumentException("Negative period: " + period);
115 }
116 Date time = new Date(System.currentTimeMillis() + delay);
117 return schedule(key, userTaskFactory, userId, userInfo, time, period);
118 }
119
120 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
121 if (firstTime ==null) {
122 throw new IllegalArgumentException("No time supplied");
123 }
124 if (firstTime.getTime() < 0) {
125 throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
126 }
127 if (period < 0) {
128 throw new IllegalArgumentException("Negative period: " + period);
129 }
130 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), false);
131 registerSynchronization(new ScheduleRepeatedSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
132 addWorkInfo(worker);
133 return worker;
134 }
135
136 public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException {
137 if (delay < 0) {
138 throw new IllegalArgumentException("Negative delay: " + delay);
139 }
140 if (period < 0) {
141 throw new IllegalArgumentException("Negative period: " + period);
142 }
143 Date time = new Date(System.currentTimeMillis() + delay);
144 return scheduleAtFixedRate(key, userTaskFactory, userId, userInfo, time, period);
145 }
146
147 public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
148 if (firstTime ==null) {
149 throw new IllegalArgumentException("No time supplied");
150 }
151 if (firstTime.getTime() < 0) {
152 throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
153 }
154 if (period < 0) {
155 throw new IllegalArgumentException("Negative period: " + period);
156 }
157 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), true);
158 registerSynchronization(new ScheduleAtFixedRateSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
159 addWorkInfo(worker);
160 return worker;
161 }
162
163 public Collection playback(String key, UserTaskFactory userTaskFactory) throws PersistenceException {
164 PlaybackImpl playback = new PlaybackImpl(userTaskFactory);
165 workerPersistence.playback(key, playback);
166 return playback.getWorkInfos();
167 }
168
169 public Collection getIdsByKey(String key, Object userId) throws PersistenceException {
170 return workerPersistence.getIdsByKey(key, userId);
171 }
172
173 public WorkInfo getWorkInfo(Long id) {
174 return (WorkInfo) idToWorkInfoMap.get(id);
175 }
176
177 /**
178 * Called when client, eg. ejb container, is stopped and needs to cancel its timertasks without
179 * affecting persisted timer data.
180 * @param ids list of ids to have their corresponding workInfo timertasks cancelled.
181 */
182 public void cancelTimerTasks(Collection ids) {
183 for (Iterator iterator = ids.iterator(); iterator.hasNext();) {
184 Long idLong = (Long) iterator.next();
185 WorkInfo workInfo = getWorkInfo(idLong);
186 if (workInfo != null) {
187 TimerTask timerTask = workInfo.getExecutorFeedingTimerTask();
188 timerTask.cancel();
189 }
190 }
191 }
192
193 void addWorkInfo(WorkInfo worker) {
194 idToWorkInfoMap.put(new Long(worker.getId()), worker);
195 }
196
197 void removeWorkInfo(WorkInfo workInfo) {
198 idToWorkInfoMap.remove(new Long(workInfo.getId()));
199 }
200
201 void workPerformed(WorkInfo workInfo) throws PersistenceException {
202 if (workInfo.isOneTime()) {
203 workerPersistence.cancel(workInfo.getId());
204 } else if (workInfo.getAtFixedRate()) {
205 workerPersistence.fixedRateWorkPerformed(workInfo.getId());
206 workInfo.nextTime();
207 } else {
208 workInfo.nextInterval();
209 workerPersistence.intervalWorkPerformed(workInfo.getId(), workInfo.getPeriod().longValue());
210 }
211 }
212
213 Timer getTimer() {
214 if (delegate == null) {
215 throw new IllegalStateException("Timer is stopped");
216 }
217 return delegate;
218 }
219
220 WorkerPersistence getWorkerPersistence() {
221 return workerPersistence;
222 }
223
224 Executor getExecutor() {
225 return executor;
226 }
227
228 private WorkInfo createWorker(String key, UserTaskFactory userTaskFactory, ExecutorTaskFactory executorTaskFactory, Object userId, Object userInfo, Date time, Long period, boolean atFixedRate) throws PersistenceException {
229 if (time == null) {
230 throw new IllegalArgumentException("Null initial time");
231 }
232 WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
233 //save and assign id
234 workerPersistence.save(workInfo);
235
236 Runnable userTask = userTaskFactory.newTask(workInfo.getId());
237 ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, this);
238 ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, this);
239 workInfo.initialize(worker, executorTask);
240 return workInfo;
241 }
242
243 void registerSynchronization(Synchronization sync) throws RollbackException, SystemException {
244 Transaction transaction = transactionManager.getTransaction();
245 int status = transaction == null ? Status.STATUS_NO_TRANSACTION : transaction.getStatus();
246
247 if (transaction != null && status == Status.STATUS_ACTIVE || status == Status.STATUS_MARKED_ROLLBACK) {
248 transaction.registerSynchronization(sync);
249 } else {
250 sync.beforeCompletion();
251 sync.afterCompletion(Status.STATUS_COMMITTED);
252 }
253 }
254
255 private class ScheduleSynchronization implements Synchronization {
256
257 private final ExecutorFeedingTimerTask worker;
258 private final Date time;
259
260 public ScheduleSynchronization(ExecutorFeedingTimerTask worker, Date time) {
261 this.worker = worker;
262 this.time = time;
263 }
264
265 public void beforeCompletion() {
266 }
267
268 public void afterCompletion(int status) {
269 if (status == Status.STATUS_COMMITTED) {
270 if (worker.isCancelled()) {
271 log.trace("Worker is already cancelled, not scheduling");
272 return;
273 }
274 try {
275 getTimer().schedule(worker, time);
276 } catch (IllegalStateException e) {
277 //TODO consider again if catching this exception is appropriate
278 log.warn("Couldn't schedule worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
279 }
280 }
281 }
282 }
283
284 private class ScheduleRepeatedSynchronization implements Synchronization {
285
286 private final ExecutorFeedingTimerTask worker;
287 private final Date time;
288 private final long period;
289
290 public ScheduleRepeatedSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
291 this.worker = worker;
292 this.time = time;
293 this.period = period;
294 }
295
296 public void beforeCompletion() {
297 }
298
299 public void afterCompletion(int status) {
300 if (status == Status.STATUS_COMMITTED) {
301 if (worker.isCancelled()) {
302 log.trace("Worker is already cancelled, not scheduling/period");
303 return;
304 }
305 try {
306 getTimer().schedule(worker, time, period);
307 } catch (Exception e) {
308 log.warn("Couldn't schedule/period worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
309 }
310 }
311 }
312 }
313
314 private class ScheduleAtFixedRateSynchronization implements Synchronization {
315
316 private final ExecutorFeedingTimerTask worker;
317 private final Date time;
318 private final long period;
319
320 public ScheduleAtFixedRateSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
321 this.worker = worker;
322 this.time = time;
323 this.period = period;
324 }
325
326 public void beforeCompletion() {
327 }
328
329 public void afterCompletion(int status) {
330 if (status == Status.STATUS_COMMITTED) {
331 if (worker.isCancelled()) {
332 log.trace("Worker is already cancelled, not scheduleAtFixedRate");
333 return;
334 }
335 try {
336 getTimer().scheduleAtFixedRate(worker, time, period);
337 } catch (Exception e) {
338 log.warn("Couldn't scheduleAtFixedRate worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
339 }
340 }
341 }
342 }
343
344 private class PlaybackImpl implements Playback {
345
346 private final UserTaskFactory userTaskFactory;
347
348 private final Collection workInfos = new ArrayList();
349
350 public PlaybackImpl(UserTaskFactory userTaskFactory) {
351 this.userTaskFactory = userTaskFactory;
352 }
353
354 public void schedule(WorkInfo workInfo) {
355 Runnable userTask = userTaskFactory.newTask(workInfo.getId());
356 ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, ThreadPooledTimer.this);
357 ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, ThreadPooledTimer.this);
358 workInfo.initialize(worker, executorTask);
359 if (workInfo.getPeriod() == null) {
360 getTimer().schedule(worker, workInfo.getTime());
361 } else if (!workInfo.getAtFixedRate()) {
362 getTimer().schedule(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
363 } else {
364 getTimer().scheduleAtFixedRate(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
365 }
366 addWorkInfo(workInfo);
367 workInfos.add(workInfo);
368 }
369
370 public Collection getWorkInfos() {
371 return workInfos;
372 }
373
374 }
375
376 }