001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.timer;
019    
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.Collections;
023    import java.util.Date;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.Timer;
028    import java.util.TimerTask;
029    import javax.transaction.RollbackException;
030    import javax.transaction.Status;
031    import javax.transaction.Synchronization;
032    import javax.transaction.SystemException;
033    import javax.transaction.Transaction;
034    import javax.transaction.TransactionManager;
035    
036    import java.util.concurrent.Executor;
037    
038    import org.apache.commons.logging.Log;
039    import org.apache.commons.logging.LogFactory;
040    import org.apache.geronimo.gbean.GBeanLifecycle;
041    
042    /**
043     *
044     *
045     * @version $Rev: 520573 $ $Date: 2007-03-20 16:48:26 -0400 (Tue, 20 Mar 2007) $
046     *
047     * */
048    public class ThreadPooledTimer implements PersistentTimer, GBeanLifecycle {
049    
050        private static final Log log = LogFactory.getLog(ThreadPooledTimer.class);
051    
052        private final ExecutorTaskFactory executorTaskFactory;
053        private final WorkerPersistence workerPersistence;
054        private final Executor executor;
055        private final TransactionManager transactionManager;
056    
057        private Timer delegate;
058    
059        private final Map idToWorkInfoMap = Collections.synchronizedMap(new HashMap());
060    
061        //default constructor for use as reference endpoint.
062        public ThreadPooledTimer() {
063            this(null, null, null, null);
064        }
065    
066        public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory, WorkerPersistence workerPersistence, Executor executor, TransactionManager transactionManager) {
067            this.executorTaskFactory = executorTaskFactory;
068            this.workerPersistence = workerPersistence;
069            this.executor = executor;
070            this.transactionManager = transactionManager;
071        }
072    
073        public void doStart() throws Exception {
074            delegate = new Timer(true);
075        }
076    
077        public void doStop() {
078            if (delegate != null) {
079                delegate.cancel();
080                delegate = null;
081            }
082        }
083    
084        public void doFail() {
085            doStop();
086        }
087    
088        public WorkInfo schedule(UserTaskFactory userTaskFactory, String key, Object userId, Object userInfo, long delay) throws PersistenceException, RollbackException, SystemException {
089            if (delay < 0) {
090                throw new IllegalArgumentException("Negative delay: " + delay);
091            }
092            Date time = new Date(System.currentTimeMillis() + delay);
093            return schedule(key, userTaskFactory, userId, userInfo, time);
094        }
095    
096        public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date time) throws PersistenceException, RollbackException, SystemException {
097            if (time ==null) {
098                throw new IllegalArgumentException("No time supplied");
099            }
100            if (time.getTime() < 0) {
101                throw new IllegalArgumentException("Negative time: " + time.getTime());
102            }
103            WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, time, null, false);
104            registerSynchronization(new ScheduleSynchronization(worker.getExecutorFeedingTimerTask(), time));
105            addWorkInfo(worker);
106            return worker;
107        }
108    
109        public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period, Object userId) throws PersistenceException, RollbackException, SystemException {
110            if (delay < 0) {
111                throw new IllegalArgumentException("Negative delay: " + delay);
112            }
113            if (period < 0) {
114                throw new IllegalArgumentException("Negative period: " + period);
115            }
116            Date time = new Date(System.currentTimeMillis() + delay);
117            return schedule(key, userTaskFactory, userId, userInfo, time, period);
118        }
119    
120        public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
121            if (firstTime ==null) {
122                throw new IllegalArgumentException("No time supplied");
123            }
124            if (firstTime.getTime() < 0) {
125                throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
126            }
127            if (period < 0) {
128                throw new IllegalArgumentException("Negative period: " + period);
129            }
130            WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), false);
131            registerSynchronization(new ScheduleRepeatedSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
132            addWorkInfo(worker);
133            return worker;
134        }
135    
136        public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException {
137            if (delay < 0) {
138                throw new IllegalArgumentException("Negative delay: " + delay);
139            }
140            if (period < 0) {
141                throw new IllegalArgumentException("Negative period: " + period);
142            }
143            Date time = new Date(System.currentTimeMillis() + delay);
144            return scheduleAtFixedRate(key, userTaskFactory, userId, userInfo, time, period);
145        }
146    
147        public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException {
148            if (firstTime ==null) {
149                throw new IllegalArgumentException("No time supplied");
150            }
151            if (firstTime.getTime() < 0) {
152                throw new IllegalArgumentException("Negative time: " + firstTime.getTime());
153            }
154            if (period < 0) {
155                throw new IllegalArgumentException("Negative period: " + period);
156            }
157            WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), true);
158            registerSynchronization(new ScheduleAtFixedRateSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period));
159            addWorkInfo(worker);
160            return worker;
161        }
162    
163        public Collection playback(String key, UserTaskFactory userTaskFactory) throws PersistenceException {
164            PlaybackImpl playback = new PlaybackImpl(userTaskFactory);
165            workerPersistence.playback(key, playback);
166            return playback.getWorkInfos();
167        }
168    
169        public Collection getIdsByKey(String key, Object userId) throws PersistenceException {
170            return workerPersistence.getIdsByKey(key, userId);
171        }
172    
173        public WorkInfo getWorkInfo(Long id) {
174            return (WorkInfo) idToWorkInfoMap.get(id);
175        }
176    
177        /**
178         * Called when client, eg. ejb container, is stopped and needs to cancel its timertasks without
179         * affecting persisted timer data.
180         * @param ids list of ids to have their corresponding workInfo timertasks cancelled.
181         */
182        public void cancelTimerTasks(Collection ids) {
183            for (Iterator iterator = ids.iterator(); iterator.hasNext();) {
184                Long idLong = (Long) iterator.next();
185                WorkInfo workInfo = getWorkInfo(idLong);
186                if (workInfo != null) {
187                    TimerTask timerTask = workInfo.getExecutorFeedingTimerTask();
188                    timerTask.cancel();
189                }
190            }
191        }
192    
193        void addWorkInfo(WorkInfo worker) {
194            idToWorkInfoMap.put(new Long(worker.getId()), worker);
195        }
196    
197        void removeWorkInfo(WorkInfo workInfo) {
198            idToWorkInfoMap.remove(new Long(workInfo.getId()));
199        }
200    
201        void workPerformed(WorkInfo workInfo) throws PersistenceException {
202            if (workInfo.isOneTime()) {
203                workerPersistence.cancel(workInfo.getId());
204            } else if (workInfo.getAtFixedRate()) {
205                workerPersistence.fixedRateWorkPerformed(workInfo.getId());
206                workInfo.nextTime();
207            } else {
208                workInfo.nextInterval();
209                workerPersistence.intervalWorkPerformed(workInfo.getId(), workInfo.getPeriod().longValue());
210            }
211        }
212    
213        Timer getTimer() {
214            if (delegate == null) {
215                throw new IllegalStateException("Timer is stopped");
216            }
217            return delegate;
218        }
219    
220        WorkerPersistence getWorkerPersistence() {
221            return workerPersistence;
222        }
223    
224        Executor getExecutor() {
225            return executor;
226        }
227    
228        private WorkInfo createWorker(String key, UserTaskFactory userTaskFactory, ExecutorTaskFactory executorTaskFactory, Object userId, Object userInfo, Date time, Long period, boolean atFixedRate) throws PersistenceException {
229            if (time == null) {
230                throw new IllegalArgumentException("Null initial time");
231            }
232            WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate);
233            //save and assign id
234            workerPersistence.save(workInfo);
235    
236            Runnable userTask = userTaskFactory.newTask(workInfo.getId());
237            ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, this);
238            ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, this);
239            workInfo.initialize(worker, executorTask);
240            return workInfo;
241        }
242    
243        void registerSynchronization(Synchronization sync) throws RollbackException, SystemException {
244            Transaction transaction = transactionManager.getTransaction();
245            int status = transaction == null ? Status.STATUS_NO_TRANSACTION : transaction.getStatus();
246    
247            if (transaction != null && status == Status.STATUS_ACTIVE || status == Status.STATUS_MARKED_ROLLBACK) {
248                transaction.registerSynchronization(sync);
249            } else {
250                sync.beforeCompletion();
251                sync.afterCompletion(Status.STATUS_COMMITTED);
252            }
253        }
254    
255        private class ScheduleSynchronization implements Synchronization {
256    
257            private final ExecutorFeedingTimerTask worker;
258            private final Date time;
259    
260            public ScheduleSynchronization(ExecutorFeedingTimerTask worker, Date time) {
261                this.worker = worker;
262                this.time = time;
263            }
264    
265            public void beforeCompletion() {
266            }
267    
268            public void afterCompletion(int status) {
269                if (status == Status.STATUS_COMMITTED) {
270                    if (worker.isCancelled()) {
271                        log.trace("Worker is already cancelled, not scheduling");
272                        return;
273                    }
274                    try {
275                        getTimer().schedule(worker, time);
276                    } catch (IllegalStateException e) {
277                        //TODO consider again if catching this exception is appropriate
278                        log.warn("Couldn't schedule worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
279                    }
280                }
281            }
282        }
283    
284        private class ScheduleRepeatedSynchronization implements Synchronization {
285    
286            private final ExecutorFeedingTimerTask worker;
287            private final Date time;
288            private final long period;
289    
290            public ScheduleRepeatedSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
291                this.worker = worker;
292                this.time = time;
293                this.period = period;
294            }
295    
296            public void beforeCompletion() {
297            }
298    
299            public void afterCompletion(int status) {
300                if (status == Status.STATUS_COMMITTED) {
301                    if (worker.isCancelled()) {
302                        log.trace("Worker is already cancelled, not scheduling/period");
303                        return;
304                    }
305                    try {
306                        getTimer().schedule(worker, time, period);
307                    } catch (Exception e) {
308                        log.warn("Couldn't schedule/period worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
309                    }
310                }
311            }
312        }
313    
314        private class ScheduleAtFixedRateSynchronization implements Synchronization {
315    
316            private final ExecutorFeedingTimerTask worker;
317            private final Date time;
318            private final long period;
319    
320            public ScheduleAtFixedRateSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) {
321                this.worker = worker;
322                this.time = time;
323                this.period = period;
324            }
325    
326            public void beforeCompletion() {
327            }
328    
329            public void afterCompletion(int status) {
330                if (status == Status.STATUS_COMMITTED) {
331                    if (worker.isCancelled()) {
332                        log.trace("Worker is already cancelled, not scheduleAtFixedRate");
333                        return;
334                    }
335                    try {
336                        getTimer().scheduleAtFixedRate(worker, time, period);
337                    } catch (Exception e) {
338                        log.warn("Couldn't scheduleAtFixedRate worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime());
339                    }
340                }
341            }
342        }
343    
344        private class PlaybackImpl implements Playback {
345    
346            private final UserTaskFactory userTaskFactory;
347    
348            private final Collection workInfos = new ArrayList();
349    
350            public PlaybackImpl(UserTaskFactory userTaskFactory) {
351                this.userTaskFactory = userTaskFactory;
352            }
353    
354            public void schedule(WorkInfo workInfo) {
355                Runnable userTask = userTaskFactory.newTask(workInfo.getId());
356                ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, ThreadPooledTimer.this);
357                ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, ThreadPooledTimer.this);
358                workInfo.initialize(worker, executorTask);
359                if (workInfo.getPeriod() == null) {
360                    getTimer().schedule(worker, workInfo.getTime());
361                } else if (!workInfo.getAtFixedRate()) {
362                    getTimer().schedule(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
363                } else {
364                    getTimer().scheduleAtFixedRate(worker, workInfo.getTime(), workInfo.getPeriod().longValue());
365                }
366                addWorkInfo(workInfo);
367                workInfos.add(workInfo);
368            }
369    
370            public Collection getWorkInfos() {
371                return workInfos;
372            }
373    
374        }
375    
376    }