001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.geronimo.timer; 020 021 import java.util.ArrayList; 022 import java.util.Collection; 023 import java.util.Collections; 024 import java.util.Date; 025 import java.util.HashMap; 026 import java.util.Iterator; 027 import java.util.Map; 028 import java.util.Timer; 029 import java.util.TimerTask; 030 import javax.transaction.RollbackException; 031 import javax.transaction.Status; 032 import javax.transaction.Synchronization; 033 import javax.transaction.SystemException; 034 import javax.transaction.Transaction; 035 import javax.transaction.TransactionManager; 036 037 import edu.emory.mathcs.backport.java.util.concurrent.Executor; 038 039 import org.apache.commons.logging.Log; 040 import org.apache.commons.logging.LogFactory; 041 import org.apache.geronimo.gbean.GBeanLifecycle; 042 043 /** 044 * 045 * 046 * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $ 047 * 048 * */ 049 public class ThreadPooledTimer implements PersistentTimer, GBeanLifecycle { 050 051 private static final Log log = LogFactory.getLog(ThreadPooledTimer.class); 052 053 private final ExecutorTaskFactory executorTaskFactory; 054 private final WorkerPersistence workerPersistence; 055 private final Executor executor; 056 private final TransactionManager transactionManager; 057 058 private Timer delegate; 059 060 private final Map idToWorkInfoMap = Collections.synchronizedMap(new HashMap()); 061 062 //default constructor for use as reference endpoint. 063 public ThreadPooledTimer() { 064 this(null, null, null, null); 065 } 066 067 public ThreadPooledTimer(ExecutorTaskFactory executorTaskFactory, WorkerPersistence workerPersistence, Executor executor, TransactionManager transactionManager) { 068 this.executorTaskFactory = executorTaskFactory; 069 this.workerPersistence = workerPersistence; 070 this.executor = executor; 071 this.transactionManager = transactionManager; 072 } 073 074 public void doStart() throws Exception { 075 delegate = new Timer(true); 076 } 077 078 public void doStop() { 079 if (delegate != null) { 080 delegate.cancel(); 081 delegate = null; 082 } 083 } 084 085 public void doFail() { 086 doStop(); 087 } 088 089 public WorkInfo schedule(UserTaskFactory userTaskFactory, String key, Object userId, Object userInfo, long delay) throws PersistenceException, RollbackException, SystemException { 090 if (delay < 0) { 091 throw new IllegalArgumentException("Negative delay: " + delay); 092 } 093 Date time = new Date(System.currentTimeMillis() + delay); 094 return schedule(key, userTaskFactory, userId, userInfo, time); 095 } 096 097 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date time) throws PersistenceException, RollbackException, SystemException { 098 if (time ==null) { 099 throw new IllegalArgumentException("No time supplied"); 100 } 101 if (time.getTime() < 0) { 102 throw new IllegalArgumentException("Negative time: " + time.getTime()); 103 } 104 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, time, null, false); 105 registerSynchronization(new ScheduleSynchronization(worker.getExecutorFeedingTimerTask(), time)); 106 addWorkInfo(worker); 107 return worker; 108 } 109 110 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userInfo, long delay, long period, Object userId) throws PersistenceException, RollbackException, SystemException { 111 if (delay < 0) { 112 throw new IllegalArgumentException("Negative delay: " + delay); 113 } 114 if (period < 0) { 115 throw new IllegalArgumentException("Negative period: " + period); 116 } 117 Date time = new Date(System.currentTimeMillis() + delay); 118 return schedule(key, userTaskFactory, userId, userInfo, time, period); 119 } 120 121 public WorkInfo schedule(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException { 122 if (firstTime ==null) { 123 throw new IllegalArgumentException("No time supplied"); 124 } 125 if (firstTime.getTime() < 0) { 126 throw new IllegalArgumentException("Negative time: " + firstTime.getTime()); 127 } 128 if (period < 0) { 129 throw new IllegalArgumentException("Negative period: " + period); 130 } 131 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), false); 132 registerSynchronization(new ScheduleRepeatedSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period)); 133 addWorkInfo(worker); 134 return worker; 135 } 136 137 public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, long delay, long period) throws PersistenceException, RollbackException, SystemException { 138 if (delay < 0) { 139 throw new IllegalArgumentException("Negative delay: " + delay); 140 } 141 if (period < 0) { 142 throw new IllegalArgumentException("Negative period: " + period); 143 } 144 Date time = new Date(System.currentTimeMillis() + delay); 145 return scheduleAtFixedRate(key, userTaskFactory, userId, userInfo, time, period); 146 } 147 148 public WorkInfo scheduleAtFixedRate(String key, UserTaskFactory userTaskFactory, Object userId, Object userInfo, Date firstTime, long period) throws PersistenceException, RollbackException, SystemException { 149 if (firstTime ==null) { 150 throw new IllegalArgumentException("No time supplied"); 151 } 152 if (firstTime.getTime() < 0) { 153 throw new IllegalArgumentException("Negative time: " + firstTime.getTime()); 154 } 155 if (period < 0) { 156 throw new IllegalArgumentException("Negative period: " + period); 157 } 158 WorkInfo worker = createWorker(key, userTaskFactory, executorTaskFactory, userId, userInfo, firstTime, new Long(period), true); 159 registerSynchronization(new ScheduleAtFixedRateSynchronization(worker.getExecutorFeedingTimerTask(), firstTime, period)); 160 addWorkInfo(worker); 161 return worker; 162 } 163 164 public Collection playback(String key, UserTaskFactory userTaskFactory) throws PersistenceException { 165 PlaybackImpl playback = new PlaybackImpl(userTaskFactory); 166 workerPersistence.playback(key, playback); 167 return playback.getWorkInfos(); 168 } 169 170 public Collection getIdsByKey(String key, Object userId) throws PersistenceException { 171 return workerPersistence.getIdsByKey(key, userId); 172 } 173 174 public WorkInfo getWorkInfo(Long id) { 175 return (WorkInfo) idToWorkInfoMap.get(id); 176 } 177 178 /** 179 * Called when client, eg. ejb container, is stopped and needs to cancel its timertasks without 180 * affecting persisted timer data. 181 * @param ids list of ids to have their corresponding workInfo timertasks cancelled. 182 */ 183 public void cancelTimerTasks(Collection ids) { 184 for (Iterator iterator = ids.iterator(); iterator.hasNext();) { 185 Long idLong = (Long) iterator.next(); 186 WorkInfo workInfo = getWorkInfo(idLong); 187 if (workInfo != null) { 188 TimerTask timerTask = workInfo.getExecutorFeedingTimerTask(); 189 timerTask.cancel(); 190 } 191 } 192 } 193 194 void addWorkInfo(WorkInfo worker) { 195 idToWorkInfoMap.put(new Long(worker.getId()), worker); 196 } 197 198 void removeWorkInfo(WorkInfo workInfo) { 199 idToWorkInfoMap.remove(new Long(workInfo.getId())); 200 } 201 202 void workPerformed(WorkInfo workInfo) throws PersistenceException { 203 if (workInfo.isOneTime()) { 204 workerPersistence.cancel(workInfo.getId()); 205 } else if (workInfo.getAtFixedRate()) { 206 workerPersistence.fixedRateWorkPerformed(workInfo.getId()); 207 workInfo.nextTime(); 208 } else { 209 workInfo.nextInterval(); 210 workerPersistence.intervalWorkPerformed(workInfo.getId(), workInfo.getPeriod().longValue()); 211 } 212 } 213 214 Timer getTimer() { 215 if (delegate == null) { 216 throw new IllegalStateException("Timer is stopped"); 217 } 218 return delegate; 219 } 220 221 WorkerPersistence getWorkerPersistence() { 222 return workerPersistence; 223 } 224 225 Executor getExecutor() { 226 return executor; 227 } 228 229 private WorkInfo createWorker(String key, UserTaskFactory userTaskFactory, ExecutorTaskFactory executorTaskFactory, Object userId, Object userInfo, Date time, Long period, boolean atFixedRate) throws PersistenceException { 230 if (time == null) { 231 throw new IllegalArgumentException("Null initial time"); 232 } 233 WorkInfo workInfo = new WorkInfo(key, userId, userInfo, time, period, atFixedRate); 234 //save and assign id 235 workerPersistence.save(workInfo); 236 237 Runnable userTask = userTaskFactory.newTask(workInfo.getId()); 238 ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, this); 239 ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, this); 240 workInfo.initialize(worker, executorTask); 241 return workInfo; 242 } 243 244 void registerSynchronization(Synchronization sync) throws RollbackException, SystemException { 245 Transaction transaction = transactionManager.getTransaction(); 246 int status = transaction == null ? Status.STATUS_NO_TRANSACTION : transaction.getStatus(); 247 248 if (transaction != null && status == Status.STATUS_ACTIVE || status == Status.STATUS_MARKED_ROLLBACK) { 249 transaction.registerSynchronization(sync); 250 } else { 251 sync.beforeCompletion(); 252 sync.afterCompletion(Status.STATUS_COMMITTED); 253 } 254 } 255 256 private class ScheduleSynchronization implements Synchronization { 257 258 private final ExecutorFeedingTimerTask worker; 259 private final Date time; 260 261 public ScheduleSynchronization(ExecutorFeedingTimerTask worker, Date time) { 262 this.worker = worker; 263 this.time = time; 264 } 265 266 public void beforeCompletion() { 267 } 268 269 public void afterCompletion(int status) { 270 if (status == Status.STATUS_COMMITTED) { 271 if (worker.isCancelled()) { 272 log.trace("Worker is already cancelled, not scheduling"); 273 return; 274 } 275 try { 276 getTimer().schedule(worker, time); 277 } catch (IllegalStateException e) { 278 //TODO consider again if catching this exception is appropriate 279 log.warn("Couldn't schedule worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime()); 280 } 281 } 282 } 283 } 284 285 private class ScheduleRepeatedSynchronization implements Synchronization { 286 287 private final ExecutorFeedingTimerTask worker; 288 private final Date time; 289 private final long period; 290 291 public ScheduleRepeatedSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) { 292 this.worker = worker; 293 this.time = time; 294 this.period = period; 295 } 296 297 public void beforeCompletion() { 298 } 299 300 public void afterCompletion(int status) { 301 if (status == Status.STATUS_COMMITTED) { 302 if (worker.isCancelled()) { 303 log.trace("Worker is already cancelled, not scheduling/period"); 304 return; 305 } 306 try { 307 getTimer().schedule(worker, time, period); 308 } catch (Exception e) { 309 log.warn("Couldn't schedule/period worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime()); 310 } 311 } 312 } 313 } 314 315 private class ScheduleAtFixedRateSynchronization implements Synchronization { 316 317 private final ExecutorFeedingTimerTask worker; 318 private final Date time; 319 private final long period; 320 321 public ScheduleAtFixedRateSynchronization(ExecutorFeedingTimerTask worker, Date time, long period) { 322 this.worker = worker; 323 this.time = time; 324 this.period = period; 325 } 326 327 public void beforeCompletion() { 328 } 329 330 public void afterCompletion(int status) { 331 if (status == Status.STATUS_COMMITTED) { 332 if (worker.isCancelled()) { 333 log.trace("Worker is already cancelled, not scheduleAtFixedRate"); 334 return; 335 } 336 try { 337 getTimer().scheduleAtFixedRate(worker, time, period); 338 } catch (Exception e) { 339 log.warn("Couldn't scheduleAtFixedRate worker " + e.getMessage() + "at (now) " + System.currentTimeMillis() + " for " + time.getTime()); 340 } 341 } 342 } 343 } 344 345 private class PlaybackImpl implements Playback { 346 347 private final UserTaskFactory userTaskFactory; 348 349 private final Collection workInfos = new ArrayList(); 350 351 public PlaybackImpl(UserTaskFactory userTaskFactory) { 352 this.userTaskFactory = userTaskFactory; 353 } 354 355 public void schedule(WorkInfo workInfo) { 356 Runnable userTask = userTaskFactory.newTask(workInfo.getId()); 357 ExecutorTask executorTask = executorTaskFactory.createExecutorTask(userTask, workInfo, ThreadPooledTimer.this); 358 ExecutorFeedingTimerTask worker = new ExecutorFeedingTimerTask(workInfo, ThreadPooledTimer.this); 359 workInfo.initialize(worker, executorTask); 360 if (workInfo.getPeriod() == null) { 361 getTimer().schedule(worker, workInfo.getTime()); 362 } else if (!workInfo.getAtFixedRate()) { 363 getTimer().schedule(worker, workInfo.getTime(), workInfo.getPeriod().longValue()); 364 } else { 365 getTimer().scheduleAtFixedRate(worker, workInfo.getTime(), workInfo.getPeriod().longValue()); 366 } 367 addWorkInfo(workInfo); 368 workInfos.add(workInfo); 369 } 370 371 public Collection getWorkInfos() { 372 return workInfos; 373 } 374 375 } 376 377 }