001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.geronimo.pool; 019 020 import java.util.Collections; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 025 import javax.management.MalformedObjectNameException; 026 import javax.management.ObjectName; 027 import javax.management.j2ee.statistics.BoundedRangeStatistic; 028 import javax.management.j2ee.statistics.CountStatistic; 029 import javax.management.j2ee.statistics.Stats; 030 031 import java.util.concurrent.ThreadPoolExecutor; 032 import java.util.concurrent.RejectedExecutionHandler; 033 import java.util.concurrent.RejectedExecutionException; 034 import java.util.concurrent.ThreadFactory; 035 import java.util.concurrent.SynchronousQueue; 036 import java.util.concurrent.TimeUnit; 037 038 import org.apache.geronimo.gbean.GBeanInfo; 039 import org.apache.geronimo.gbean.GBeanInfoBuilder; 040 import org.apache.geronimo.gbean.GBeanLifecycle; 041 042 import org.apache.geronimo.management.J2EEManagedObject; 043 import org.apache.geronimo.management.StatisticsProvider; 044 import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats; 045 import org.apache.geronimo.management.stats.BoundedRangeStatisticImpl; 046 import org.apache.geronimo.management.stats.CountStatisticImpl; 047 import org.apache.geronimo.management.stats.StatsImpl; 048 049 /** 050 * @version $Rev: 549455 $ $Date: 2007-06-21 08:12:27 -0400 (Thu, 21 Jun 2007) $ 051 */ 052 public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider { 053 private ThreadPoolExecutor executor; 054 private ClassLoader classLoader; 055 private ObjectName objectName; 056 private boolean waitWhenBlocked; 057 058 // Statistics-related fields follow 059 private boolean statsActive = true; 060 private PoolStatsImpl stats = new PoolStatsImpl(); 061 private Map clients = new HashMap(); 062 063 public ThreadPool(int poolSize, String poolName, long keepAliveTime, ClassLoader classLoader, String objectName) { 064 ThreadPoolExecutor p = new ThreadPoolExecutor( 065 poolSize, // core size 066 poolSize, // max size 067 keepAliveTime, TimeUnit.MILLISECONDS, 068 new SynchronousQueue()); 069 070 p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 071 p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader)); 072 073 try { 074 this.objectName = ObjectName.getInstance(objectName); 075 } catch (MalformedObjectNameException e) { 076 throw new IllegalStateException("Bad object name injected: " + e.getMessage(), e); 077 } 078 079 executor = p; 080 this.classLoader = classLoader; 081 } 082 083 public String getName() { 084 return objectName.getKeyProperty("name"); 085 } 086 087 public String getObjectName() { 088 return objectName.getCanonicalName(); 089 } 090 091 public boolean isEventProvider() { 092 return true; 093 } 094 095 public boolean isStateManageable() { 096 return true; 097 } 098 099 public boolean isStatisticsProvider() { 100 return true; 101 } 102 103 public Stats getStats() { 104 stats.threadsInUse.setLowerBound(0); 105 stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize()); 106 int inUse = executor.getPoolSize(); 107 stats.threadsInUse.setCurrent(inUse); 108 if (inUse < stats.threadsInUse.getLowWaterMark()) { 109 stats.threadsInUse.setLowWaterMark(inUse); 110 } 111 if (inUse > stats.threadsInUse.getHighWaterMark()) { 112 stats.threadsInUse.setHighWaterMark(inUse); 113 } 114 if (statsActive) { 115 synchronized (this) { 116 stats.prepareConsumers(clients); 117 } 118 } else { 119 stats.prepareConsumers(Collections.EMPTY_MAP); 120 } 121 return stats; 122 } 123 124 public void resetStats() { 125 // TODO 126 } 127 128 public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats { 129 private BoundedRangeStatisticImpl threadsInUse = new BoundedRangeStatisticImpl( 130 "Threads In Use", "", 131 "The number of threads in use by this thread pool"); 132 private Map consumers = new HashMap(); 133 134 public PoolStatsImpl() { 135 addStat(threadsInUse.getName(), threadsInUse); 136 } 137 138 public BoundedRangeStatistic getThreadsInUse() { 139 return threadsInUse; 140 } 141 142 public CountStatistic getCountForConsumer(String consumer) { 143 return (CountStatistic) consumers.get(consumer); 144 } 145 146 public String[] getThreadConsumers() { 147 return (String[]) consumers.keySet().toArray(new String[consumers.size()]); 148 } 149 150 public void prepareConsumers(Map clients) { 151 Map result = new HashMap(); 152 for (Iterator it = clients.keySet().iterator(); it.hasNext();) { 153 String client = (String) it.next(); 154 Integer count = (Integer) clients.get(client); 155 CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client); 156 if (stat == null) { 157 stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue()); 158 addStat(stat.getName(), stat); 159 } else { 160 consumers.remove(client); 161 stat.setCount(count.intValue()); 162 } 163 result.put(client, stat); 164 } 165 for (Iterator it = consumers.keySet().iterator(); it.hasNext();) { 166 String client = (String) it.next(); 167 removeStat(((CountStatisticImpl) consumers.get(client)).getName()); 168 } 169 consumers = result; 170 } 171 } 172 173 174 public int getPoolSize() { 175 return executor.getPoolSize(); 176 } 177 178 public int getMaximumPoolSize() { 179 return executor.getMaximumPoolSize(); 180 } 181 182 public int getActiveCount() { 183 return executor.getActiveCount(); 184 } 185 186 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 187 return executor.awaitTermination(timeout, unit); 188 } 189 190 public void execute(Runnable command) { 191 execute("Unknown", command); 192 } 193 194 public void execute(final String consumerName, final Runnable runnable) { 195 Runnable command; 196 if (statsActive) { 197 command = new Runnable() { 198 public void run() { 199 startWork(consumerName); 200 try { 201 runnable.run(); 202 } finally { 203 finishWork(consumerName); 204 } 205 } 206 }; 207 } else { 208 command = runnable; 209 } 210 211 ThreadPoolExecutor p; 212 synchronized (this) { 213 p = executor; 214 } 215 if (p == null) { 216 throw new IllegalStateException("ThreadPool has been stopped"); 217 } 218 Runnable task = new ContextClassLoaderRunnable(command, classLoader); 219 p.execute(task); 220 } 221 222 private synchronized void startWork(String consumerName) { 223 Integer test = (Integer) clients.get(consumerName); 224 if (test == null) { 225 clients.put(consumerName, new Integer(1)); 226 } else { 227 clients.put(consumerName, new Integer(test.intValue() + 1)); 228 } 229 } 230 231 private synchronized void finishWork(String consumerName) { 232 Integer test = (Integer) clients.get(consumerName); 233 if (test.intValue() == 1) { 234 clients.remove(consumerName); 235 } else { 236 clients.put(consumerName, new Integer(test.intValue() - 1)); 237 } 238 } 239 240 private static class WaitWhenBlockedPolicy 241 implements RejectedExecutionHandler 242 { 243 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) throws RejectedExecutionException { 244 try { 245 executor.getQueue().put(r); 246 } 247 catch (InterruptedException e) { 248 throw new RejectedExecutionException(e); 249 } 250 } 251 } 252 253 public void setWaitWhenBlocked(boolean wait) { 254 waitWhenBlocked = wait; 255 if(wait) { 256 executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy()); 257 } else { 258 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 259 } 260 } 261 262 public boolean isWaitWhenBlocked() { 263 return waitWhenBlocked; 264 } 265 266 public void doStart() throws Exception { 267 } 268 269 public void doStop() throws Exception { 270 ThreadPoolExecutor p; 271 synchronized (this) { 272 p = executor; 273 executor = null; 274 classLoader = null; 275 } 276 if (p != null) { 277 p.shutdownNow(); 278 } 279 } 280 281 public void doFail() { 282 try { 283 doStop(); 284 } catch (Exception e) { 285 } 286 } 287 288 private static final class ThreadPoolThreadFactory implements ThreadFactory { 289 private final String poolName; 290 private final ClassLoader classLoader; 291 292 private int nextWorkerID = 0; 293 294 public ThreadPoolThreadFactory(String poolName, ClassLoader classLoader) { 295 this.poolName = poolName; 296 this.classLoader = classLoader; 297 } 298 299 public Thread newThread(Runnable arg0) { 300 Thread thread = new Thread(arg0, poolName + " " + getNextWorkerID()); 301 thread.setContextClassLoader(classLoader); 302 return thread; 303 } 304 305 private synchronized int getNextWorkerID() { 306 return nextWorkerID++; 307 } 308 } 309 310 private static final class ContextClassLoaderRunnable implements Runnable { 311 private Runnable task; 312 private ClassLoader classLoader; 313 314 public ContextClassLoaderRunnable(Runnable task, ClassLoader classLoader) { 315 this.task = task; 316 this.classLoader = classLoader; 317 } 318 319 public void run() { 320 Runnable myTask = task; 321 ClassLoader myClassLoader = classLoader; 322 323 // clear fields so they can be garbage collected 324 task = null; 325 classLoader = null; 326 327 if (myClassLoader != null) { 328 // we asumme the thread classloader is already set to our final class loader 329 // because the only to access the thread is wrapped with the Runnable or via the initial thread pool 330 try { 331 myTask.run(); 332 } finally { 333 Thread.currentThread().setContextClassLoader(myClassLoader); 334 } 335 } 336 } 337 } 338 339 public static final GBeanInfo GBEAN_INFO; 340 341 static { 342 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class, "GBean"); 343 344 infoFactory.addAttribute("poolSize", int.class, true); 345 infoFactory.addAttribute("poolName", String.class, true); 346 infoFactory.addAttribute("keepAliveTime", long.class, true); 347 infoFactory.addAttribute("waitWhenBlocked", boolean.class, true); 348 349 infoFactory.addAttribute("objectName", String.class, false); 350 infoFactory.addAttribute("classLoader", ClassLoader.class, false); 351 352 infoFactory.addInterface(GeronimoExecutor.class); 353 354 infoFactory.setConstructor(new String[]{"poolSize", "poolName", "keepAliveTime", "classLoader", "objectName"}); 355 356 GBEAN_INFO = infoFactory.getBeanInfo(); 357 } 358 359 public static GBeanInfo getGBeanInfo() { 360 return GBEAN_INFO; 361 } 362 }