001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.geronimo.pool;
019
020 import java.util.Collections;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.Map;
024
025 import javax.management.MalformedObjectNameException;
026 import javax.management.ObjectName;
027 import javax.management.j2ee.statistics.BoundedRangeStatistic;
028 import javax.management.j2ee.statistics.CountStatistic;
029 import javax.management.j2ee.statistics.Stats;
030
031 import java.util.concurrent.ThreadPoolExecutor;
032 import java.util.concurrent.RejectedExecutionHandler;
033 import java.util.concurrent.RejectedExecutionException;
034 import java.util.concurrent.ThreadFactory;
035 import java.util.concurrent.SynchronousQueue;
036 import java.util.concurrent.TimeUnit;
037
038 import org.apache.geronimo.gbean.GBeanInfo;
039 import org.apache.geronimo.gbean.GBeanInfoBuilder;
040 import org.apache.geronimo.gbean.GBeanLifecycle;
041
042 import org.apache.geronimo.management.J2EEManagedObject;
043 import org.apache.geronimo.management.StatisticsProvider;
044 import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats;
045 import org.apache.geronimo.management.stats.BoundedRangeStatisticImpl;
046 import org.apache.geronimo.management.stats.CountStatisticImpl;
047 import org.apache.geronimo.management.stats.StatsImpl;
048
049 /**
050 * @version $Rev: 549455 $ $Date: 2007-06-21 08:12:27 -0400 (Thu, 21 Jun 2007) $
051 */
052 public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider {
053 private ThreadPoolExecutor executor;
054 private ClassLoader classLoader;
055 private ObjectName objectName;
056 private boolean waitWhenBlocked;
057
058 // Statistics-related fields follow
059 private boolean statsActive = true;
060 private PoolStatsImpl stats = new PoolStatsImpl();
061 private Map clients = new HashMap();
062
063 public ThreadPool(int poolSize, String poolName, long keepAliveTime, ClassLoader classLoader, String objectName) {
064 ThreadPoolExecutor p = new ThreadPoolExecutor(
065 poolSize, // core size
066 poolSize, // max size
067 keepAliveTime, TimeUnit.MILLISECONDS,
068 new SynchronousQueue());
069
070 p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
071 p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader));
072
073 try {
074 this.objectName = ObjectName.getInstance(objectName);
075 } catch (MalformedObjectNameException e) {
076 throw new IllegalStateException("Bad object name injected: " + e.getMessage(), e);
077 }
078
079 executor = p;
080 this.classLoader = classLoader;
081 }
082
083 public String getName() {
084 return objectName.getKeyProperty("name");
085 }
086
087 public String getObjectName() {
088 return objectName.getCanonicalName();
089 }
090
091 public boolean isEventProvider() {
092 return true;
093 }
094
095 public boolean isStateManageable() {
096 return true;
097 }
098
099 public boolean isStatisticsProvider() {
100 return true;
101 }
102
103 public Stats getStats() {
104 stats.threadsInUse.setLowerBound(0);
105 stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize());
106 int inUse = executor.getPoolSize();
107 stats.threadsInUse.setCurrent(inUse);
108 if (inUse < stats.threadsInUse.getLowWaterMark()) {
109 stats.threadsInUse.setLowWaterMark(inUse);
110 }
111 if (inUse > stats.threadsInUse.getHighWaterMark()) {
112 stats.threadsInUse.setHighWaterMark(inUse);
113 }
114 if (statsActive) {
115 synchronized (this) {
116 stats.prepareConsumers(clients);
117 }
118 } else {
119 stats.prepareConsumers(Collections.EMPTY_MAP);
120 }
121 return stats;
122 }
123
124 public void resetStats() {
125 // TODO
126 }
127
128 public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats {
129 private BoundedRangeStatisticImpl threadsInUse = new BoundedRangeStatisticImpl(
130 "Threads In Use", "",
131 "The number of threads in use by this thread pool");
132 private Map consumers = new HashMap();
133
134 public PoolStatsImpl() {
135 addStat(threadsInUse.getName(), threadsInUse);
136 }
137
138 public BoundedRangeStatistic getThreadsInUse() {
139 return threadsInUse;
140 }
141
142 public CountStatistic getCountForConsumer(String consumer) {
143 return (CountStatistic) consumers.get(consumer);
144 }
145
146 public String[] getThreadConsumers() {
147 return (String[]) consumers.keySet().toArray(new String[consumers.size()]);
148 }
149
150 public void prepareConsumers(Map clients) {
151 Map result = new HashMap();
152 for (Iterator it = clients.keySet().iterator(); it.hasNext();) {
153 String client = (String) it.next();
154 Integer count = (Integer) clients.get(client);
155 CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client);
156 if (stat == null) {
157 stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue());
158 addStat(stat.getName(), stat);
159 } else {
160 consumers.remove(client);
161 stat.setCount(count.intValue());
162 }
163 result.put(client, stat);
164 }
165 for (Iterator it = consumers.keySet().iterator(); it.hasNext();) {
166 String client = (String) it.next();
167 removeStat(((CountStatisticImpl) consumers.get(client)).getName());
168 }
169 consumers = result;
170 }
171 }
172
173
174 public int getPoolSize() {
175 return executor.getPoolSize();
176 }
177
178 public int getMaximumPoolSize() {
179 return executor.getMaximumPoolSize();
180 }
181
182 public int getActiveCount() {
183 return executor.getActiveCount();
184 }
185
186 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
187 return executor.awaitTermination(timeout, unit);
188 }
189
190 public void execute(Runnable command) {
191 execute("Unknown", command);
192 }
193
194 public void execute(final String consumerName, final Runnable runnable) {
195 Runnable command;
196 if (statsActive) {
197 command = new Runnable() {
198 public void run() {
199 startWork(consumerName);
200 try {
201 runnable.run();
202 } finally {
203 finishWork(consumerName);
204 }
205 }
206 };
207 } else {
208 command = runnable;
209 }
210
211 ThreadPoolExecutor p;
212 synchronized (this) {
213 p = executor;
214 }
215 if (p == null) {
216 throw new IllegalStateException("ThreadPool has been stopped");
217 }
218 Runnable task = new ContextClassLoaderRunnable(command, classLoader);
219 p.execute(task);
220 }
221
222 private synchronized void startWork(String consumerName) {
223 Integer test = (Integer) clients.get(consumerName);
224 if (test == null) {
225 clients.put(consumerName, new Integer(1));
226 } else {
227 clients.put(consumerName, new Integer(test.intValue() + 1));
228 }
229 }
230
231 private synchronized void finishWork(String consumerName) {
232 Integer test = (Integer) clients.get(consumerName);
233 if (test.intValue() == 1) {
234 clients.remove(consumerName);
235 } else {
236 clients.put(consumerName, new Integer(test.intValue() - 1));
237 }
238 }
239
240 private static class WaitWhenBlockedPolicy
241 implements RejectedExecutionHandler
242 {
243 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) throws RejectedExecutionException {
244 try {
245 executor.getQueue().put(r);
246 }
247 catch (InterruptedException e) {
248 throw new RejectedExecutionException(e);
249 }
250 }
251 }
252
253 public void setWaitWhenBlocked(boolean wait) {
254 waitWhenBlocked = wait;
255 if(wait) {
256 executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy());
257 } else {
258 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
259 }
260 }
261
262 public boolean isWaitWhenBlocked() {
263 return waitWhenBlocked;
264 }
265
266 public void doStart() throws Exception {
267 }
268
269 public void doStop() throws Exception {
270 ThreadPoolExecutor p;
271 synchronized (this) {
272 p = executor;
273 executor = null;
274 classLoader = null;
275 }
276 if (p != null) {
277 p.shutdownNow();
278 }
279 }
280
281 public void doFail() {
282 try {
283 doStop();
284 } catch (Exception e) {
285 }
286 }
287
288 private static final class ThreadPoolThreadFactory implements ThreadFactory {
289 private final String poolName;
290 private final ClassLoader classLoader;
291
292 private int nextWorkerID = 0;
293
294 public ThreadPoolThreadFactory(String poolName, ClassLoader classLoader) {
295 this.poolName = poolName;
296 this.classLoader = classLoader;
297 }
298
299 public Thread newThread(Runnable arg0) {
300 Thread thread = new Thread(arg0, poolName + " " + getNextWorkerID());
301 thread.setContextClassLoader(classLoader);
302 return thread;
303 }
304
305 private synchronized int getNextWorkerID() {
306 return nextWorkerID++;
307 }
308 }
309
310 private static final class ContextClassLoaderRunnable implements Runnable {
311 private Runnable task;
312 private ClassLoader classLoader;
313
314 public ContextClassLoaderRunnable(Runnable task, ClassLoader classLoader) {
315 this.task = task;
316 this.classLoader = classLoader;
317 }
318
319 public void run() {
320 Runnable myTask = task;
321 ClassLoader myClassLoader = classLoader;
322
323 // clear fields so they can be garbage collected
324 task = null;
325 classLoader = null;
326
327 if (myClassLoader != null) {
328 // we asumme the thread classloader is already set to our final class loader
329 // because the only to access the thread is wrapped with the Runnable or via the initial thread pool
330 try {
331 myTask.run();
332 } finally {
333 Thread.currentThread().setContextClassLoader(myClassLoader);
334 }
335 }
336 }
337 }
338
339 public static final GBeanInfo GBEAN_INFO;
340
341 static {
342 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class, "GBean");
343
344 infoFactory.addAttribute("poolSize", int.class, true);
345 infoFactory.addAttribute("poolName", String.class, true);
346 infoFactory.addAttribute("keepAliveTime", long.class, true);
347 infoFactory.addAttribute("waitWhenBlocked", boolean.class, true);
348
349 infoFactory.addAttribute("objectName", String.class, false);
350 infoFactory.addAttribute("classLoader", ClassLoader.class, false);
351
352 infoFactory.addInterface(GeronimoExecutor.class);
353
354 infoFactory.setConstructor(new String[]{"poolSize", "poolName", "keepAliveTime", "classLoader", "objectName"});
355
356 GBEAN_INFO = infoFactory.getBeanInfo();
357 }
358
359 public static GBeanInfo getGBeanInfo() {
360 return GBEAN_INFO;
361 }
362 }