001    /**
002     *
003     *  Licensed to the Apache Software Foundation (ASF) under one or more
004     *  contributor license agreements.  See the NOTICE file distributed with
005     *  this work for additional information regarding copyright ownership.
006     *  The ASF licenses this file to You under the Apache License, Version 2.0
007     *  (the "License"); you may not use this file except in compliance with
008     *  the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     *  Unless required by applicable law or agreed to in writing, software
013     *  distributed under the License is distributed on an "AS IS" BASIS,
014     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     *  See the License for the specific language governing permissions and
016     *  limitations under the License.
017     */
018    
019    package org.apache.geronimo.timer;
020    
021    import java.util.ArrayList;
022    import java.util.Collection;
023    import java.util.Collections;
024    import java.util.Date;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.Map;
028    import java.util.Timer;
029    import java.util.TimerTask;
030    import javax.transaction.RollbackException;
031    import javax.transaction.Status;
032    import javax.transaction.Synchronization;
033    import javax.transaction.SystemException;
034    import javax.transaction.Transaction;
035    import javax.transaction.TransactionManager;
036    
037    import edu.emory.mathcs.backport.java.util.concurrent.Executor;
038    
039    import org.apache.commons.logging.Log;
040    import org.apache.commons.logging.LogFactory;
041    import org.apache.geronimo.gbean.GBeanLifecycle;
042    
043    /**
044     *
045     *
046     * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
047     *
048     * */
049    public class ThreadPooledTimer implements PersistentTimer, GBeanLifecycle {
050    
051        private static final Log log = LogFactory.getLog(ThreadPooledTimer.class);
052    
053        private final ExecutorTaskFactory executorTaskFactory;
054        private final WorkerPersistence workerPersistence;
055        private final Executor executor;
056        private final TransactionManager transactionManager;
057    
058        private Timer delegate;
059    
060        private final Map idToWorkInfoMap = Collections.synchronizedMap(new HashMap());
061    
062        //default constructor for use as reference endpoint.
063        public ThreadPooledTimer() {
064            this(null, null, null, null);
065        }
066    
067        public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory, WorkerPersistence workerPersistence, Executor executor, TransactionManager transactionManager) {
068            this.executorTaskFactory = executorTaskFactory;
069            this.workerPersistence = workerPersistence;
070            this.executor = executor;
071            this.transactionManager = transactionManager;
072        }
073    
074        public void doStart() throws Exception {
075            delegate = new Timer(true);
076        }
077    
078        public void doStop() {
079            if (delegate != null) {
080                delegate.cancel();
081                delegate = null;
082            }
083        }
084    
085        public void doFail() {
086            doStop();
087        }
088    
089        public WorkInfo schedule(UserTaskFactory userTaskFactory, String key, Object userId, Object userInfo, long delay) throws PersistenceException, RollbackException, SystemException {
090            if (delay < 0) {
091                throw new IllegalArgumentException("Negative delay: " + delay);
092            }
093            Date time = new Date(System.currentTimeMillis() + delay);
094            return schedule(key, userTaskFactory, userId, userInfo, time);
095        }
096    
097        public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date time) throws PersistenceException, RollbackException, SystemException {
098            if (time ==null) {
099                throw new IllegalArgumentException("No time supplied");
100            }
101            if (time.getTime() < 0) {
102                throw new IllegalArgumentException("Negative time: " + time.getTime());
103            }
104            WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, time, null, false);
105            registerSynchronization(new ScheduleSynchronization(worker.getExecutorFeedingTimerTask(), time));
106            addWorkInfo(worker);
107            return worker;
108        }
109    
110        public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period, Object userId) throws PersistenceException, RollbackException, SystemException {
111            if (delay < 0) {
112                throw new IllegalArgumentException("Negative delay: " + delay);
113            }
114            if (period < 0) {
115                throw new IllegalArgumentException("Negative period: " + period);
116            }
117            Date time = new Date(System.currentTimeMillis() + delay);
118            return schedule(key, userTaskFactory, userId, userInfo, time, period);
119        }
120    
121        public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
122            if (firstTime ==null) {
123                throw new IllegalArgumentException("No time supplied");
124            }
125            if (firstTime.getTime() < 0) {
126                throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
127            }
128            if (period < 0) {
129                throw new IllegalArgumentException("Negative period: " + period);
130            }
131            WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), false);
132            registerSynchronization(new ScheduleRepeatedSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
133            addWorkInfo(worker);
134            return worker;
135        }
136    
137        public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException {
138            if (delay < 0) {
139                throw new IllegalArgumentException("Negative delay: " + delay);
140            }
141            if (period < 0) {
142                throw new IllegalArgumentException("Negative period: " + period);
143            }
144            Date time = new Date(System.currentTimeMillis() + delay);
145            return scheduleAtFixedRate(key, userTaskFactory, userId, userInfo, time, period);
146        }
147    
148        public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
149            if (firstTime ==null) {
150                throw new IllegalArgumentException("No time supplied");
151            }
152            if (firstTime.getTime() < 0) {
153                throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
154            }
155            if (period < 0) {
156                throw new IllegalArgumentException("Negative period: " + period);
157            }
158            WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), true);
159            registerSynchronization(new ScheduleAtFixedRateSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
160            addWorkInfo(worker);
161            return worker;
162        }
163    
164        public Collection playback(String key, UserTaskFactory userTaskFactory) throws PersistenceException {
165            PlaybackImpl playback = new PlaybackImpl(userTaskFactory);
166            workerPersistence.playback(key, playback);
167            return playback.getWorkInfos();
168        }
169    
170        public Collection getIdsByKey(String key, Object userId) throws PersistenceException {
171            return workerPersistence.getIdsByKey(key, userId);
172        }
173    
174        public WorkInfo getWorkInfo(Long id) {
175            return (WorkInfo) idToWorkInfoMap.get(id);
176        }
177    
178        /**
179         * Called when client, eg. ejb container, is stopped and needs to cancel its timertasks without
180         * affecting persisted timer data.
181         * @param ids list of ids to have their corresponding workInfo timertasks cancelled.
182         */
183        public void cancelTimerTasks(Collection ids) {
184            for (Iterator iterator = ids.iterator(); iterator.hasNext();) {
185                Long idLong = (Long) iterator.next();
186                WorkInfo workInfo = getWorkInfo(idLong);
187                if (workInfo != null) {
188                    TimerTask timerTask = workInfo.getExecutorFeedingTimerTask();
189                    timerTask.cancel();
190                }
191            }
192        }
193    
194        void addWorkInfo(WorkInfo worker) {
195            idToWorkInfoMap.put(new Long(worker.getId()), worker);
196        }
197    
198        void removeWorkInfo(WorkInfo workInfo) {
199            idToWorkInfoMap.remove(new Long(workInfo.getId()));
200        }
201    
202        void workPerformed(WorkInfo workInfo) throws PersistenceException {
203            if (workInfo.isOneTime()) {
204                workerPersistence.cancel(workInfo.getId());
205            } else if (workInfo.getAtFixedRate()) {
206                workerPersistence.fixedRateWorkPerformed(workInfo.getId());
207                workInfo.nextTime();
208            } else {
209                workInfo.nextInterval();
210                workerPersistence.intervalWorkPerformed(workInfo.getId(), workInfo.getPeriod().longValue());
211            }
212        }
213    
214        Timer getTimer() {
215            if (delegate == null) {
216                throw new IllegalStateException("Timer is stopped");
217            }
218            return delegate;
219        }
220    
221        WorkerPersistence getWorkerPersistence() {
222            return workerPersistence;
223        }
224    
225        Executor getExecutor() {
226            return executor;
227        }
228    
229        private WorkInfo createWorker(String key, UserTaskFactory userTaskFactory, ExecutorTaskFactory executorTaskFactory, Object userId, Object userInfo, Date time, Long period, boolean atFixedRate) throws PersistenceException {
230            if (time == null) {
231                throw new IllegalArgumentException("Null initial time");
232            }
233            WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
234            //save and assign id
235            workerPersistence.save(workInfo);
236    
237            Runnable userTask = userTaskFactory.newTask(workInfo.getId());
238            ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, this);
239            ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, this);
240            workInfo.initialize(worker, executorTask);
241            return workInfo;
242        }
243    
244        void registerSynchronization(Synchronization sync) throws RollbackException, SystemException {
245            Transaction transaction = transactionManager.getTransaction();
246            int status = transaction == null ? Status.STATUS_NO_TRANSACTION : transaction.getStatus();
247    
248            if (transaction != null && status == Status.STATUS_ACTIVE || status == Status.STATUS_MARKED_ROLLBACK) {
249                transaction.registerSynchronization(sync);
250            } else {
251                sync.beforeCompletion();
252                sync.afterCompletion(Status.STATUS_COMMITTED);
253            }
254        }
255    
256        private class ScheduleSynchronization implements Synchronization {
257    
258            private final ExecutorFeedingTimerTask worker;
259            private final Date time;
260    
261            public ScheduleSynchronization(ExecutorFeedingTimerTask worker, Date time) {
262                this.worker = worker;
263                this.time = time;
264            }
265    
266            public void beforeCompletion() {
267            }
268    
269            public void afterCompletion(int status) {
270                if (status == Status.STATUS_COMMITTED) {
271                    if (worker.isCancelled()) {
272                        log.trace("Worker is already cancelled, not scheduling");
273                        return;
274                    }
275                    try {
276                        getTimer().schedule(worker, time);
277                    } catch (IllegalStateException e) {
278                        //TODO consider again if catching this exception is appropriate
279                        log.warn("Couldn't schedule worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
280                    }
281                }
282            }
283        }
284    
285        private class ScheduleRepeatedSynchronization implements Synchronization {
286    
287            private final ExecutorFeedingTimerTask worker;
288            private final Date time;
289            private final long period;
290    
291            public ScheduleRepeatedSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
292                this.worker = worker;
293                this.time = time;
294                this.period = period;
295            }
296    
297            public void beforeCompletion() {
298            }
299    
300            public void afterCompletion(int status) {
301                if (status == Status.STATUS_COMMITTED) {
302                    if (worker.isCancelled()) {
303                        log.trace("Worker is already cancelled, not scheduling/period");
304                        return;
305                    }
306                    try {
307                        getTimer().schedule(worker, time, period);
308                    } catch (Exception e) {
309                        log.warn("Couldn't schedule/period worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
310                    }
311                }
312            }
313        }
314    
315        private class ScheduleAtFixedRateSynchronization implements Synchronization {
316    
317            private final ExecutorFeedingTimerTask worker;
318            private final Date time;
319            private final long period;
320    
321            public ScheduleAtFixedRateSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
322                this.worker = worker;
323                this.time = time;
324                this.period = period;
325            }
326    
327            public void beforeCompletion() {
328            }
329    
330            public void afterCompletion(int status) {
331                if (status == Status.STATUS_COMMITTED) {
332                    if (worker.isCancelled()) {
333                        log.trace("Worker is already cancelled, not scheduleAtFixedRate");
334                        return;
335                    }
336                    try {
337                        getTimer().scheduleAtFixedRate(worker, time, period);
338                    } catch (Exception e) {
339                        log.warn("Couldn't scheduleAtFixedRate worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
340                    }
341                }
342            }
343        }
344    
345        private class PlaybackImpl implements Playback {
346    
347            private final UserTaskFactory userTaskFactory;
348    
349            private final Collection workInfos = new ArrayList();
350    
351            public PlaybackImpl(UserTaskFactory userTaskFactory) {
352                this.userTaskFactory = userTaskFactory;
353            }
354    
355            public void schedule(WorkInfo workInfo) {
356                Runnable userTask = userTaskFactory.newTask(workInfo.getId());
357                ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, ThreadPooledTimer.this);
358                ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, ThreadPooledTimer.this);
359                workInfo.initialize(worker, executorTask);
360                if (workInfo.getPeriod() == null) {
361                    getTimer().schedule(worker, workInfo.getTime());
362                } else if (!workInfo.getAtFixedRate()) {
363                    getTimer().schedule(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
364                } else {
365                    getTimer().scheduleAtFixedRate(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
366                }
367                addWorkInfo(workInfo);
368                workInfos.add(workInfo);
369            }
370    
371            public Collection getWorkInfos() {
372                return workInfos;
373            }
374    
375        }
376    
377    }