View Javadoc

1   /**
2    *
3    *  Licensed to the Apache Software Foundation (ASF) under one or more
4    *  contributor license agreements.  See the NOTICE file distributed with
5    *  this work for additional information regarding copyright ownership.
6    *  The ASF licenses this file to You under the Apache License, Version 2.0
7    *  (the "License"); you may not use this file except in compliance with
8    *  the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing, software
13   *  distributed under the License is distributed on an "AS IS" BASIS,
14   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *  See the License for the specific language governing permissions and
16   *  limitations under the License.
17   */
18  
19  package org.apache.geronimo.timer;
20  
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Collections;
24  import java.util.Date;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.Map;
28  import java.util.Timer;
29  import java.util.TimerTask;
30  import javax.transaction.RollbackException;
31  import javax.transaction.Status;
32  import javax.transaction.Synchronization;
33  import javax.transaction.SystemException;
34  import javax.transaction.Transaction;
35  import javax.transaction.TransactionManager;
36  
37  import edu.emory.mathcs.backport.java.util.concurrent.Executor;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.geronimo.gbean.GBeanLifecycle;
42  
43  /**
44   *
45   *
46   * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
47   *
48   * */
49  public class ThreadPooledTimer implements PersistentTimer, GBeanLifecycle {
50  
51      private static final Log log = LogFactory.getLog(ThreadPooledTimer.class);
52  
53      private final ExecutorTaskFactory executorTaskFactory;
54      private final WorkerPersistence workerPersistence;
55      private final Executor executor;
56      private final TransactionManager transactionManager;
57  
58      private Timer delegate;
59  
60      private final Map idToWorkInfoMap = Collections.synchronizedMap(new HashMap());
61  
62      //default constructor for use as reference endpoint.
63      public ThreadPooledTimer() {
64          this(null, null, null, null);
65      }
66  
67      public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory, WorkerPersistence workerPersistence, Executor executor, TransactionManager transactionManager) {
68          this.executorTaskFactory = executorTaskFactory;
69          this.workerPersistence = workerPersistence;
70          this.executor = executor;
71          this.transactionManager = transactionManager;
72      }
73  
74      public void doStart() throws Exception {
75          delegate = new Timer(true);
76      }
77  
78      public void doStop() {
79          if (delegate != null) {
80              delegate.cancel();
81              delegate = null;
82          }
83      }
84  
85      public void doFail() {
86          doStop();
87      }
88  
89      public WorkInfo schedule(UserTaskFactory userTaskFactory, String key, Object userId, Object userInfo, long delay) throws PersistenceException, RollbackException, SystemException {
90          if (delay < 0) {
91              throw new IllegalArgumentException("Negative delay: " + delay);
92          }
93          Date time = new Date(System.currentTimeMillis() + delay);
94          return schedule(key, userTaskFactory, userId, userInfo, time);
95      }
96  
97      public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date time) throws PersistenceException, RollbackException, SystemException {
98          if (time ==null) {
99              throw new IllegalArgumentException("No time supplied");
100         }
101         if (time.getTime() < 0) {
102             throw new IllegalArgumentException("Negative time: " + time.getTime());
103         }
104         WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, time, null, false);
105         registerSynchronization(new ScheduleSynchronization(worker.getExecutorFeedingTimerTask(), time));
106         addWorkInfo(worker);
107         return worker;
108     }
109 
110     public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period, Object userId) throws PersistenceException, RollbackException, SystemException {
111         if (delay < 0) {
112             throw new IllegalArgumentException("Negative delay: " + delay);
113         }
114         if (period < 0) {
115             throw new IllegalArgumentException("Negative period: " + period);
116         }
117         Date time = new Date(System.currentTimeMillis() + delay);
118         return schedule(key, userTaskFactory, userId, userInfo, time, period);
119     }
120 
121     public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
122         if (firstTime ==null) {
123             throw new IllegalArgumentException("No time supplied");
124         }
125         if (firstTime.getTime() < 0) {
126             throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
127         }
128         if (period < 0) {
129             throw new IllegalArgumentException("Negative period: " + period);
130         }
131         WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), false);
132         registerSynchronization(new ScheduleRepeatedSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
133         addWorkInfo(worker);
134         return worker;
135     }
136 
137     public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException {
138         if (delay < 0) {
139             throw new IllegalArgumentException("Negative delay: " + delay);
140         }
141         if (period < 0) {
142             throw new IllegalArgumentException("Negative period: " + period);
143         }
144         Date time = new Date(System.currentTimeMillis() + delay);
145         return scheduleAtFixedRate(key, userTaskFactory, userId, userInfo, time, period);
146     }
147 
148     public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
149         if (firstTime ==null) {
150             throw new IllegalArgumentException("No time supplied");
151         }
152         if (firstTime.getTime() < 0) {
153             throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
154         }
155         if (period < 0) {
156             throw new IllegalArgumentException("Negative period: " + period);
157         }
158         WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), true);
159         registerSynchronization(new ScheduleAtFixedRateSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
160         addWorkInfo(worker);
161         return worker;
162     }
163 
164     public Collection playback(String key, UserTaskFactory userTaskFactory) throws PersistenceException {
165         PlaybackImpl playback = new PlaybackImpl(userTaskFactory);
166         workerPersistence.playback(key, playback);
167         return playback.getWorkInfos();
168     }
169 
170     public Collection getIdsByKey(String key, Object userId) throws PersistenceException {
171         return workerPersistence.getIdsByKey(key, userId);
172     }
173 
174     public WorkInfo getWorkInfo(Long id) {
175         return (WorkInfo) idToWorkInfoMap.get(id);
176     }
177 
178     /**
179      * Called when client, eg. ejb container, is stopped and needs to cancel its timertasks without
180      * affecting persisted timer data.
181      * @param ids list of ids to have their corresponding workInfo timertasks cancelled.
182      */
183     public void cancelTimerTasks(Collection ids) {
184         for (Iterator iterator = ids.iterator(); iterator.hasNext();) {
185             Long idLong = (Long) iterator.next();
186             WorkInfo workInfo = getWorkInfo(idLong);
187             if (workInfo != null) {
188                 TimerTask timerTask = workInfo.getExecutorFeedingTimerTask();
189                 timerTask.cancel();
190             }
191         }
192     }
193 
194     void addWorkInfo(WorkInfo worker) {
195         idToWorkInfoMap.put(new Long(worker.getId()), worker);
196     }
197 
198     void removeWorkInfo(WorkInfo workInfo) {
199         idToWorkInfoMap.remove(new Long(workInfo.getId()));
200     }
201 
202     void workPerformed(WorkInfo workInfo) throws PersistenceException {
203         if (workInfo.isOneTime()) {
204             workerPersistence.cancel(workInfo.getId());
205         } else if (workInfo.getAtFixedRate()) {
206             workerPersistence.fixedRateWorkPerformed(workInfo.getId());
207             workInfo.nextTime();
208         } else {
209             workInfo.nextInterval();
210             workerPersistence.intervalWorkPerformed(workInfo.getId(), workInfo.getPeriod().longValue());
211         }
212     }
213 
214     Timer getTimer() {
215         if (delegate == null) {
216             throw new IllegalStateException("Timer is stopped");
217         }
218         return delegate;
219     }
220 
221     WorkerPersistence getWorkerPersistence() {
222         return workerPersistence;
223     }
224 
225     Executor getExecutor() {
226         return executor;
227     }
228 
229     private WorkInfo createWorker(String key, UserTaskFactory userTaskFactory, ExecutorTaskFactory executorTaskFactory, Object userId, Object userInfo, Date time, Long period, boolean atFixedRate) throws PersistenceException {
230         if (time == null) {
231             throw new IllegalArgumentException("Null initial time");
232         }
233         WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
234         //save and assign id
235         workerPersistence.save(workInfo);
236 
237         Runnable userTask = userTaskFactory.newTask(workInfo.getId());
238         ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, this);
239         ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, this);
240         workInfo.initialize(worker, executorTask);
241         return workInfo;
242     }
243 
244     void registerSynchronization(Synchronization sync) throws RollbackException, SystemException {
245         Transaction transaction = transactionManager.getTransaction();
246         int status = transaction == null ? Status.STATUS_NO_TRANSACTION : transaction.getStatus();
247 
248         if (transaction != null && status == Status.STATUS_ACTIVE || status == Status.STATUS_MARKED_ROLLBACK) {
249             transaction.registerSynchronization(sync);
250         } else {
251             sync.beforeCompletion();
252             sync.afterCompletion(Status.STATUS_COMMITTED);
253         }
254     }
255 
256     private class ScheduleSynchronization implements Synchronization {
257 
258         private final ExecutorFeedingTimerTask worker;
259         private final Date time;
260 
261         public ScheduleSynchronization(ExecutorFeedingTimerTask worker, Date time) {
262             this.worker = worker;
263             this.time = time;
264         }
265 
266         public void beforeCompletion() {
267         }
268 
269         public void afterCompletion(int status) {
270             if (status == Status.STATUS_COMMITTED) {
271                 if (worker.isCancelled()) {
272                     log.trace("Worker is already cancelled, not scheduling");
273                     return;
274                 }
275                 try {
276                     getTimer().schedule(worker, time);
277                 } catch (IllegalStateException e) {
278                     //TODO consider again if catching this exception is appropriate
279                     log.warn("Couldn't schedule worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
280                 }
281             }
282         }
283     }
284 
285     private class ScheduleRepeatedSynchronization implements Synchronization {
286 
287         private final ExecutorFeedingTimerTask worker;
288         private final Date time;
289         private final long period;
290 
291         public ScheduleRepeatedSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
292             this.worker = worker;
293             this.time = time;
294             this.period = period;
295         }
296 
297         public void beforeCompletion() {
298         }
299 
300         public void afterCompletion(int status) {
301             if (status == Status.STATUS_COMMITTED) {
302                 if (worker.isCancelled()) {
303                     log.trace("Worker is already cancelled, not scheduling/period");
304                     return;
305                 }
306                 try {
307                     getTimer().schedule(worker, time, period);
308                 } catch (Exception e) {
309                     log.warn("Couldn't schedule/period worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
310                 }
311             }
312         }
313     }
314 
315     private class ScheduleAtFixedRateSynchronization implements Synchronization {
316 
317         private final ExecutorFeedingTimerTask worker;
318         private final Date time;
319         private final long period;
320 
321         public ScheduleAtFixedRateSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
322             this.worker = worker;
323             this.time = time;
324             this.period = period;
325         }
326 
327         public void beforeCompletion() {
328         }
329 
330         public void afterCompletion(int status) {
331             if (status == Status.STATUS_COMMITTED) {
332                 if (worker.isCancelled()) {
333                     log.trace("Worker is already cancelled, not scheduleAtFixedRate");
334                     return;
335                 }
336                 try {
337                     getTimer().scheduleAtFixedRate(worker, time, period);
338                 } catch (Exception e) {
339                     log.warn("Couldn't scheduleAtFixedRate worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
340                 }
341             }
342         }
343     }
344 
345     private class PlaybackImpl implements Playback {
346 
347         private final UserTaskFactory userTaskFactory;
348 
349         private final Collection workInfos = new ArrayList();
350 
351         public PlaybackImpl(UserTaskFactory userTaskFactory) {
352             this.userTaskFactory = userTaskFactory;
353         }
354 
355         public void schedule(WorkInfo workInfo) {
356             Runnable userTask = userTaskFactory.newTask(workInfo.getId());
357             ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, ThreadPooledTimer.this);
358             ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, ThreadPooledTimer.this);
359             workInfo.initialize(worker, executorTask);
360             if (workInfo.getPeriod() == null) {
361                 getTimer().schedule(worker, workInfo.getTime());
362             } else if (!workInfo.getAtFixedRate()) {
363                 getTimer().schedule(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
364             } else {
365                 getTimer().scheduleAtFixedRate(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
366             }
367             addWorkInfo(workInfo);
368             workInfos.add(workInfo);
369         }
370 
371         public Collection getWorkInfos() {
372             return workInfos;
373         }
374 
375     }
376 
377 }