001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.geronimo.pool; 019 020 import java.util.Collections; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.Map; 024 025 import javax.management.MalformedObjectNameException; 026 import javax.management.ObjectName; 027 import javax.management.j2ee.statistics.BoundedRangeStatistic; 028 import javax.management.j2ee.statistics.CountStatistic; 029 import javax.management.j2ee.statistics.Stats; 030 031 import java.util.concurrent.ThreadPoolExecutor; 032 import java.util.concurrent.RejectedExecutionHandler; 033 import java.util.concurrent.RejectedExecutionException; 034 import java.util.concurrent.ThreadFactory; 035 import java.util.concurrent.SynchronousQueue; 036 import java.util.concurrent.TimeUnit; 037 038 import org.apache.geronimo.gbean.GBeanInfo; 039 import org.apache.geronimo.gbean.GBeanInfoBuilder; 040 import org.apache.geronimo.gbean.GBeanLifecycle; 041 042 import org.apache.geronimo.management.J2EEManagedObject; 043 import org.apache.geronimo.management.StatisticsProvider; 044 import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats; 045 import org.apache.geronimo.management.stats.BoundedRangeStatisticImpl; 046 import org.apache.geronimo.management.stats.CountStatisticImpl; 047 import org.apache.geronimo.management.stats.StatsImpl; 048 049 /** 050 * @version $Rev: 706640 $ $Date: 2008-10-21 14:44:05 +0000 (Tue, 21 Oct 2008) $ 051 */ 052 public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider { 053 private ThreadPoolExecutor executor; 054 private ClassLoader classLoader; 055 private ObjectName objectName; 056 private boolean waitWhenBlocked; 057 058 // Statistics-related fields follow 059 private boolean statsActive = true; 060 private PoolStatsImpl stats = new PoolStatsImpl(); 061 private Map clients = new HashMap(); 062 063 public ThreadPool(int minPoolSize, int maxPoolSize, String poolName, long keepAliveTime, ClassLoader classLoader, String objectName) { 064 ThreadPoolExecutor p = new ThreadPoolExecutor( 065 minPoolSize, // core size 066 maxPoolSize, // max size 067 keepAliveTime, TimeUnit.MILLISECONDS, 068 new SynchronousQueue()); 069 070 p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 071 p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader)); 072 073 try { 074 this.objectName = ObjectName.getInstance(objectName); 075 } catch (MalformedObjectNameException e) { 076 throw new IllegalStateException("Bad object name injected: " + e.getMessage(), e); 077 } 078 079 executor = p; 080 this.classLoader = classLoader; 081 082 // set pool stats start time 083 stats.setStartTime(); 084 } 085 086 public String getName() { 087 return objectName.getKeyProperty("name"); 088 } 089 090 public String getObjectName() { 091 return objectName.getCanonicalName(); 092 } 093 094 public boolean isEventProvider() { 095 return true; 096 } 097 098 public boolean isStateManageable() { 099 return true; 100 } 101 102 public boolean isStatisticsProvider() { 103 return true; 104 } 105 106 public Stats getStats() { 107 stats.threadsInUse.setLowerBound(0); 108 stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize()); 109 int inUse = executor.getPoolSize(); 110 stats.threadsInUse.setCurrent(inUse); 111 if (inUse < stats.threadsInUse.getLowWaterMark()) { 112 stats.threadsInUse.setLowWaterMark(inUse); 113 } 114 if (inUse > stats.threadsInUse.getHighWaterMark()) { 115 stats.threadsInUse.setHighWaterMark(inUse); 116 } 117 if (statsActive) { 118 synchronized (this) { 119 stats.prepareConsumers(clients); 120 } 121 } else { 122 stats.prepareConsumers(Collections.EMPTY_MAP); 123 } 124 // set last sapmle time 125 stats.setLastSampleTime(); 126 return stats; 127 } 128 129 /** 130 * Reset all statistics in PoolStatsImpl object 131 */ 132 public void resetStats() { 133 stats.threadsInUse.setLowerBound(0); 134 stats.threadsInUse.setUpperBound(0); 135 stats.threadsInUse.setCurrent(0); 136 stats.threadsInUse.setLowWaterMark(0); 137 stats.threadsInUse.setHighWaterMark(0); 138 stats.setStartTime(); 139 } 140 141 public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats { 142 private BoundedRangeStatisticImpl threadsInUse = new BoundedRangeStatisticImpl( 143 "Threads In Use", "", 144 "The number of threads in use by this thread pool"); 145 private Map consumers = new HashMap(); 146 147 public PoolStatsImpl() { 148 addStat(threadsInUse.getName(), threadsInUse); 149 } 150 151 public BoundedRangeStatistic getThreadsInUse() { 152 return threadsInUse; 153 } 154 155 public CountStatistic getCountForConsumer(String consumer) { 156 return (CountStatistic) consumers.get(consumer); 157 } 158 159 public String[] getThreadConsumers() { 160 return (String[]) consumers.keySet().toArray(new String[consumers.size()]); 161 } 162 163 public void prepareConsumers(Map clients) { 164 Map result = new HashMap(); 165 for (Iterator it = clients.keySet().iterator(); it.hasNext();) { 166 String client = (String) it.next(); 167 Integer count = (Integer) clients.get(client); 168 CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client); 169 if (stat == null) { 170 stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue()); 171 addStat(stat.getName(), stat); 172 } else { 173 consumers.remove(client); 174 stat.setCount(count.intValue()); 175 } 176 result.put(client, stat); 177 } 178 for (Iterator it = consumers.keySet().iterator(); it.hasNext();) { 179 String client = (String) it.next(); 180 removeStat(((CountStatisticImpl) consumers.get(client)).getName()); 181 } 182 consumers = result; 183 } 184 } 185 186 187 public int getPoolSize() { 188 return executor.getPoolSize(); 189 } 190 191 public int getMaximumPoolSize() { 192 return executor.getMaximumPoolSize(); 193 } 194 195 public int getActiveCount() { 196 return executor.getActiveCount(); 197 } 198 199 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 200 return executor.awaitTermination(timeout, unit); 201 } 202 203 public void execute(Runnable command) { 204 execute("Unknown", command); 205 } 206 207 public void execute(final String consumerName, final Runnable runnable) { 208 Runnable command; 209 if (statsActive) { 210 command = new Runnable() { 211 public void run() { 212 startWork(consumerName); 213 try { 214 runnable.run(); 215 } finally { 216 finishWork(consumerName); 217 } 218 } 219 }; 220 } else { 221 command = runnable; 222 } 223 224 ThreadPoolExecutor p; 225 synchronized (this) { 226 p = executor; 227 } 228 if (p == null) { 229 throw new IllegalStateException("ThreadPool has been stopped"); 230 } 231 Runnable task = new ContextClassLoaderRunnable(command, classLoader); 232 p.execute(task); 233 } 234 235 private synchronized void startWork(String consumerName) { 236 Integer test = (Integer) clients.get(consumerName); 237 if (test == null) { 238 clients.put(consumerName, new Integer(1)); 239 } else { 240 clients.put(consumerName, new Integer(test.intValue() + 1)); 241 } 242 } 243 244 private synchronized void finishWork(String consumerName) { 245 Integer test = (Integer) clients.get(consumerName); 246 if (test.intValue() == 1) { 247 clients.remove(consumerName); 248 } else { 249 clients.put(consumerName, new Integer(test.intValue() - 1)); 250 } 251 } 252 253 private static class WaitWhenBlockedPolicy 254 implements RejectedExecutionHandler 255 { 256 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) throws RejectedExecutionException { 257 try { 258 executor.getQueue().put(r); 259 } 260 catch (InterruptedException e) { 261 throw new RejectedExecutionException(e); 262 } 263 } 264 } 265 266 public void setWaitWhenBlocked(boolean wait) { 267 waitWhenBlocked = wait; 268 if(wait) { 269 executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy()); 270 } else { 271 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 272 } 273 } 274 275 public boolean isWaitWhenBlocked() { 276 return waitWhenBlocked; 277 } 278 279 public void doStart() throws Exception { 280 } 281 282 public void doStop() throws Exception { 283 ThreadPoolExecutor p; 284 synchronized (this) { 285 p = executor; 286 executor = null; 287 classLoader = null; 288 } 289 if (p != null) { 290 p.shutdownNow(); 291 } 292 } 293 294 public void doFail() { 295 try { 296 doStop(); 297 } catch (Exception e) { 298 } 299 } 300 301 private static final class ThreadPoolThreadFactory implements ThreadFactory { 302 private final String poolName; 303 private final ClassLoader classLoader; 304 305 private int nextWorkerID = 0; 306 307 public ThreadPoolThreadFactory(String poolName, ClassLoader classLoader) { 308 this.poolName = poolName; 309 this.classLoader = classLoader; 310 } 311 312 public Thread newThread(Runnable arg0) { 313 Thread thread = new Thread(arg0, poolName + " " + getNextWorkerID()); 314 thread.setContextClassLoader(classLoader); 315 return thread; 316 } 317 318 private synchronized int getNextWorkerID() { 319 return nextWorkerID++; 320 } 321 } 322 323 private static final class ContextClassLoaderRunnable implements Runnable { 324 private Runnable task; 325 private ClassLoader classLoader; 326 327 public ContextClassLoaderRunnable(Runnable task, ClassLoader classLoader) { 328 this.task = task; 329 this.classLoader = classLoader; 330 } 331 332 public void run() { 333 Runnable myTask = task; 334 ClassLoader myClassLoader = classLoader; 335 336 // clear fields so they can be garbage collected 337 task = null; 338 classLoader = null; 339 340 if (myClassLoader != null) { 341 // we asumme the thread classloader is already set to our final class loader 342 // because the only to access the thread is wrapped with the Runnable or via the initial thread pool 343 try { 344 myTask.run(); 345 } finally { 346 Thread.currentThread().setContextClassLoader(myClassLoader); 347 } 348 } 349 } 350 } 351 352 public static final GBeanInfo GBEAN_INFO; 353 354 static { 355 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class, "GBean"); 356 357 infoFactory.addAttribute("minPoolSize", int.class, true); 358 infoFactory.addAttribute("maxPoolSize", int.class, true); 359 infoFactory.addAttribute("poolName", String.class, true); 360 infoFactory.addAttribute("keepAliveTime", long.class, true); 361 infoFactory.addAttribute("waitWhenBlocked", boolean.class, true); 362 363 infoFactory.addAttribute("objectName", String.class, false); 364 infoFactory.addAttribute("classLoader", ClassLoader.class, false); 365 366 infoFactory.addInterface(GeronimoExecutor.class); 367 368 infoFactory.setConstructor(new String[]{"minPoolSize", "maxPoolSize", "poolName", "keepAliveTime", "classLoader", "objectName"}); 369 370 GBEAN_INFO = infoFactory.getBeanInfo(); 371 } 372 373 public static GBeanInfo getGBeanInfo() { 374 return GBEAN_INFO; 375 } 376 }