001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.geronimo.pool;
019
020 import java.util.Collections;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.Map;
024
025 import javax.management.MalformedObjectNameException;
026 import javax.management.ObjectName;
027 import javax.management.j2ee.statistics.BoundedRangeStatistic;
028 import javax.management.j2ee.statistics.CountStatistic;
029 import javax.management.j2ee.statistics.Stats;
030
031 import java.util.concurrent.ThreadPoolExecutor;
032 import java.util.concurrent.RejectedExecutionHandler;
033 import java.util.concurrent.RejectedExecutionException;
034 import java.util.concurrent.ThreadFactory;
035 import java.util.concurrent.SynchronousQueue;
036 import java.util.concurrent.TimeUnit;
037
038 import org.apache.geronimo.gbean.GBeanInfo;
039 import org.apache.geronimo.gbean.GBeanInfoBuilder;
040 import org.apache.geronimo.gbean.GBeanLifecycle;
041
042 import org.apache.geronimo.management.J2EEManagedObject;
043 import org.apache.geronimo.management.StatisticsProvider;
044 import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats;
045 import org.apache.geronimo.management.stats.BoundedRangeStatisticImpl;
046 import org.apache.geronimo.management.stats.CountStatisticImpl;
047 import org.apache.geronimo.management.stats.StatsImpl;
048
049 /**
050 * @version $Rev: 706640 $ $Date: 2008-10-21 14:44:05 +0000 (Tue, 21 Oct 2008) $
051 */
052 public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider {
053 private ThreadPoolExecutor executor;
054 private ClassLoader classLoader;
055 private ObjectName objectName;
056 private boolean waitWhenBlocked;
057
058 // Statistics-related fields follow
059 private boolean statsActive = true;
060 private PoolStatsImpl stats = new PoolStatsImpl();
061 private Map clients = new HashMap();
062
063 public ThreadPool(int minPoolSize, int maxPoolSize, String poolName, long keepAliveTime, ClassLoader classLoader, String objectName) {
064 ThreadPoolExecutor p = new ThreadPoolExecutor(
065 minPoolSize, // core size
066 maxPoolSize, // max size
067 keepAliveTime, TimeUnit.MILLISECONDS,
068 new SynchronousQueue());
069
070 p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
071 p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader));
072
073 try {
074 this.objectName = ObjectName.getInstance(objectName);
075 } catch (MalformedObjectNameException e) {
076 throw new IllegalStateException("Bad object name injected: " + e.getMessage(), e);
077 }
078
079 executor = p;
080 this.classLoader = classLoader;
081
082 // set pool stats start time
083 stats.setStartTime();
084 }
085
086 public String getName() {
087 return objectName.getKeyProperty("name");
088 }
089
090 public String getObjectName() {
091 return objectName.getCanonicalName();
092 }
093
094 public boolean isEventProvider() {
095 return true;
096 }
097
098 public boolean isStateManageable() {
099 return true;
100 }
101
102 public boolean isStatisticsProvider() {
103 return true;
104 }
105
106 public Stats getStats() {
107 stats.threadsInUse.setLowerBound(0);
108 stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize());
109 int inUse = executor.getPoolSize();
110 stats.threadsInUse.setCurrent(inUse);
111 if (inUse < stats.threadsInUse.getLowWaterMark()) {
112 stats.threadsInUse.setLowWaterMark(inUse);
113 }
114 if (inUse > stats.threadsInUse.getHighWaterMark()) {
115 stats.threadsInUse.setHighWaterMark(inUse);
116 }
117 if (statsActive) {
118 synchronized (this) {
119 stats.prepareConsumers(clients);
120 }
121 } else {
122 stats.prepareConsumers(Collections.EMPTY_MAP);
123 }
124 // set last sapmle time
125 stats.setLastSampleTime();
126 return stats;
127 }
128
129 /**
130 * Reset all statistics in PoolStatsImpl object
131 */
132 public void resetStats() {
133 stats.threadsInUse.setLowerBound(0);
134 stats.threadsInUse.setUpperBound(0);
135 stats.threadsInUse.setCurrent(0);
136 stats.threadsInUse.setLowWaterMark(0);
137 stats.threadsInUse.setHighWaterMark(0);
138 stats.setStartTime();
139 }
140
141 public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats {
142 private BoundedRangeStatisticImpl threadsInUse = new BoundedRangeStatisticImpl(
143 "Threads In Use", "",
144 "The number of threads in use by this thread pool");
145 private Map consumers = new HashMap();
146
147 public PoolStatsImpl() {
148 addStat(threadsInUse.getName(), threadsInUse);
149 }
150
151 public BoundedRangeStatistic getThreadsInUse() {
152 return threadsInUse;
153 }
154
155 public CountStatistic getCountForConsumer(String consumer) {
156 return (CountStatistic) consumers.get(consumer);
157 }
158
159 public String[] getThreadConsumers() {
160 return (String[]) consumers.keySet().toArray(new String[consumers.size()]);
161 }
162
163 public void prepareConsumers(Map clients) {
164 Map result = new HashMap();
165 for (Iterator it = clients.keySet().iterator(); it.hasNext();) {
166 String client = (String) it.next();
167 Integer count = (Integer) clients.get(client);
168 CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client);
169 if (stat == null) {
170 stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue());
171 addStat(stat.getName(), stat);
172 } else {
173 consumers.remove(client);
174 stat.setCount(count.intValue());
175 }
176 result.put(client, stat);
177 }
178 for (Iterator it = consumers.keySet().iterator(); it.hasNext();) {
179 String client = (String) it.next();
180 removeStat(((CountStatisticImpl) consumers.get(client)).getName());
181 }
182 consumers = result;
183 }
184 }
185
186
187 public int getPoolSize() {
188 return executor.getPoolSize();
189 }
190
191 public int getMaximumPoolSize() {
192 return executor.getMaximumPoolSize();
193 }
194
195 public int getActiveCount() {
196 return executor.getActiveCount();
197 }
198
199 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
200 return executor.awaitTermination(timeout, unit);
201 }
202
203 public void execute(Runnable command) {
204 execute("Unknown", command);
205 }
206
207 public void execute(final String consumerName, final Runnable runnable) {
208 Runnable command;
209 if (statsActive) {
210 command = new Runnable() {
211 public void run() {
212 startWork(consumerName);
213 try {
214 runnable.run();
215 } finally {
216 finishWork(consumerName);
217 }
218 }
219 };
220 } else {
221 command = runnable;
222 }
223
224 ThreadPoolExecutor p;
225 synchronized (this) {
226 p = executor;
227 }
228 if (p == null) {
229 throw new IllegalStateException("ThreadPool has been stopped");
230 }
231 Runnable task = new ContextClassLoaderRunnable(command, classLoader);
232 p.execute(task);
233 }
234
235 private synchronized void startWork(String consumerName) {
236 Integer test = (Integer) clients.get(consumerName);
237 if (test == null) {
238 clients.put(consumerName, new Integer(1));
239 } else {
240 clients.put(consumerName, new Integer(test.intValue() + 1));
241 }
242 }
243
244 private synchronized void finishWork(String consumerName) {
245 Integer test = (Integer) clients.get(consumerName);
246 if (test.intValue() == 1) {
247 clients.remove(consumerName);
248 } else {
249 clients.put(consumerName, new Integer(test.intValue() - 1));
250 }
251 }
252
253 private static class WaitWhenBlockedPolicy
254 implements RejectedExecutionHandler
255 {
256 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) throws RejectedExecutionException {
257 try {
258 executor.getQueue().put(r);
259 }
260 catch (InterruptedException e) {
261 throw new RejectedExecutionException(e);
262 }
263 }
264 }
265
266 public void setWaitWhenBlocked(boolean wait) {
267 waitWhenBlocked = wait;
268 if(wait) {
269 executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy());
270 } else {
271 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
272 }
273 }
274
275 public boolean isWaitWhenBlocked() {
276 return waitWhenBlocked;
277 }
278
279 public void doStart() throws Exception {
280 }
281
282 public void doStop() throws Exception {
283 ThreadPoolExecutor p;
284 synchronized (this) {
285 p = executor;
286 executor = null;
287 classLoader = null;
288 }
289 if (p != null) {
290 p.shutdownNow();
291 }
292 }
293
294 public void doFail() {
295 try {
296 doStop();
297 } catch (Exception e) {
298 }
299 }
300
301 private static final class ThreadPoolThreadFactory implements ThreadFactory {
302 private final String poolName;
303 private final ClassLoader classLoader;
304
305 private int nextWorkerID = 0;
306
307 public ThreadPoolThreadFactory(String poolName, ClassLoader classLoader) {
308 this.poolName = poolName;
309 this.classLoader = classLoader;
310 }
311
312 public Thread newThread(Runnable arg0) {
313 Thread thread = new Thread(arg0, poolName + " " + getNextWorkerID());
314 thread.setContextClassLoader(classLoader);
315 return thread;
316 }
317
318 private synchronized int getNextWorkerID() {
319 return nextWorkerID++;
320 }
321 }
322
323 private static final class ContextClassLoaderRunnable implements Runnable {
324 private Runnable task;
325 private ClassLoader classLoader;
326
327 public ContextClassLoaderRunnable(Runnable task, ClassLoader classLoader) {
328 this.task = task;
329 this.classLoader = classLoader;
330 }
331
332 public void run() {
333 Runnable myTask = task;
334 ClassLoader myClassLoader = classLoader;
335
336 // clear fields so they can be garbage collected
337 task = null;
338 classLoader = null;
339
340 if (myClassLoader != null) {
341 // we asumme the thread classloader is already set to our final class loader
342 // because the only to access the thread is wrapped with the Runnable or via the initial thread pool
343 try {
344 myTask.run();
345 } finally {
346 Thread.currentThread().setContextClassLoader(myClassLoader);
347 }
348 }
349 }
350 }
351
352 public static final GBeanInfo GBEAN_INFO;
353
354 static {
355 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class, "GBean");
356
357 infoFactory.addAttribute("minPoolSize", int.class, true);
358 infoFactory.addAttribute("maxPoolSize", int.class, true);
359 infoFactory.addAttribute("poolName", String.class, true);
360 infoFactory.addAttribute("keepAliveTime", long.class, true);
361 infoFactory.addAttribute("waitWhenBlocked", boolean.class, true);
362
363 infoFactory.addAttribute("objectName", String.class, false);
364 infoFactory.addAttribute("classLoader", ClassLoader.class, false);
365
366 infoFactory.addInterface(GeronimoExecutor.class);
367
368 infoFactory.setConstructor(new String[]{"minPoolSize", "maxPoolSize", "poolName", "keepAliveTime", "classLoader", "objectName"});
369
370 GBEAN_INFO = infoFactory.getBeanInfo();
371 }
372
373 public static GBeanInfo getGBeanInfo() {
374 return GBEAN_INFO;
375 }
376 }