1 /**
2 *
3 * Licensed to the Apache Software Foundation (ASF) under one or more
4 * contributor license agreements. See the NOTICE file distributed with
5 * this work for additional information regarding copyright ownership.
6 * The ASF licenses this file to You under the Apache License, Version 2.0
7 * (the "License"); you may not use this file except in compliance with
8 * the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.geronimo.pool;
20
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25
26 import javax.management.MalformedObjectNameException;
27 import javax.management.ObjectName;
28 import javax.management.j2ee.statistics.BoundedRangeStatistic;
29 import javax.management.j2ee.statistics.CountStatistic;
30 import javax.management.j2ee.statistics.Stats;
31
32 import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
33 import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
34 import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
35 import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
36 import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
37 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
38
39 import org.apache.geronimo.gbean.GBeanInfo;
40 import org.apache.geronimo.gbean.GBeanInfoBuilder;
41 import org.apache.geronimo.gbean.GBeanLifecycle;
42
43 import org.apache.geronimo.management.J2EEManagedObject;
44 import org.apache.geronimo.management.StatisticsProvider;
45 import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats;
46 import org.apache.geronimo.management.stats.BoundedRangeImpl;
47 import org.apache.geronimo.management.stats.CountStatisticImpl;
48 import org.apache.geronimo.management.stats.StatsImpl;
49
50 /**
51 * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
52 */
53 public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider {
54 private ThreadPoolExecutor executor;
55 private ClassLoader classLoader;
56 private ObjectName objectName;
57 private boolean waitWhenBlocked;
58
59
60 private boolean statsActive = true;
61 private PoolStatsImpl stats = new PoolStatsImpl();
62 private Map clients = new HashMap();
63
64 public ThreadPool(int poolSize, String poolName, long keepAliveTime, ClassLoader classLoader, String objectName) {
65 ThreadPoolExecutor p = new ThreadPoolExecutor(
66 poolSize,
67 poolSize,
68 keepAliveTime, TimeUnit.MILLISECONDS,
69 new SynchronousQueue());
70
71 p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
72 p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader));
73
74 try {
75 this.objectName = ObjectName.getInstance(objectName);
76 } catch (MalformedObjectNameException e) {
77 throw new IllegalStateException("Bad object name injected: " + e.getMessage());
78 }
79
80 executor = p;
81 this.classLoader = classLoader;
82 }
83
84 public String getName() {
85 return objectName.getKeyProperty("name");
86 }
87
88 public String getObjectName() {
89 return objectName.getCanonicalName();
90 }
91
92 public boolean isEventProvider() {
93 return true;
94 }
95
96 public boolean isStateManageable() {
97 return true;
98 }
99
100 public boolean isStatisticsProvider() {
101 return true;
102 }
103
104 public Stats getStats() {
105 stats.threadsInUse.setLowerBound(0);
106 stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize());
107 int inUse = executor.getPoolSize();
108 stats.threadsInUse.setCurrent(inUse);
109 if (inUse < stats.threadsInUse.getLowWaterMark()) {
110 stats.threadsInUse.setLowWaterMark(inUse);
111 }
112 if (inUse > stats.threadsInUse.getHighWaterMark()) {
113 stats.threadsInUse.setHighWaterMark(inUse);
114 }
115 if (statsActive) {
116 synchronized (this) {
117 stats.prepareConsumers(clients);
118 }
119 } else {
120 stats.prepareConsumers(Collections.EMPTY_MAP);
121 }
122 return stats;
123 }
124
125 public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats {
126 private BoundedRangeImpl threadsInUse = new BoundedRangeImpl("Threads In Use", "", "The number of threads in use by this thread pool");
127 private Map consumers = new HashMap();
128
129 public PoolStatsImpl() {
130 addStat(threadsInUse.getName(), threadsInUse);
131 }
132
133 public BoundedRangeStatistic getThreadsInUse() {
134 return threadsInUse;
135 }
136
137 public CountStatistic getCountForConsumer(String consumer) {
138 return (CountStatistic) consumers.get(consumer);
139 }
140
141 public String[] getThreadConsumers() {
142 return (String[]) consumers.keySet().toArray(new String[consumers.size()]);
143 }
144
145 public void prepareConsumers(Map clients) {
146 Map result = new HashMap();
147 for (Iterator it = clients.keySet().iterator(); it.hasNext();) {
148 String client = (String) it.next();
149 Integer count = (Integer) clients.get(client);
150 CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client);
151 if (stat == null) {
152 stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue());
153 addStat(stat.getName(), stat);
154 } else {
155 consumers.remove(client);
156 stat.setCount(count.intValue());
157 }
158 result.put(client, stat);
159 }
160 for (Iterator it = consumers.keySet().iterator(); it.hasNext();) {
161 String client = (String) it.next();
162 removeStat(((CountStatisticImpl) consumers.get(client)).getName());
163 }
164 consumers = result;
165 }
166 }
167
168
169 public int getPoolSize() {
170 return executor.getMaximumPoolSize();
171 }
172
173 public void execute(Runnable command) {
174 execute("Unknown", command);
175 }
176
177 public void execute(final String consumerName, final Runnable runnable) {
178 Runnable command;
179 if (statsActive) {
180 command = new Runnable() {
181 public void run() {
182 startWork(consumerName);
183 try {
184 runnable.run();
185 } finally {
186 finishWork(consumerName);
187 }
188 }
189 };
190 } else {
191 command = runnable;
192 }
193
194 ThreadPoolExecutor p;
195 synchronized (this) {
196 p = executor;
197 }
198 if (p == null) {
199 throw new IllegalStateException("ThreadPool has been stopped");
200 }
201 Runnable task = new ContextClassLoaderRunnable(command, classLoader);
202 p.execute(task);
203 }
204
205 private synchronized void startWork(String consumerName) {
206 Integer test = (Integer) clients.get(consumerName);
207 if (test == null) {
208 clients.put(consumerName, new Integer(1));
209 } else {
210 clients.put(consumerName, new Integer(test.intValue() + 1));
211 }
212 }
213
214 private synchronized void finishWork(String consumerName) {
215 Integer test = (Integer) clients.get(consumerName);
216 if (test.intValue() == 1) {
217 clients.remove(consumerName);
218 } else {
219 clients.put(consumerName, new Integer(test.intValue() - 1));
220 }
221 }
222
223 private static class WaitWhenBlockedPolicy
224 implements RejectedExecutionHandler
225 {
226 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) throws RejectedExecutionException {
227 try {
228 executor.getQueue().put(r);
229 }
230 catch (InterruptedException e) {
231 throw new RejectedExecutionException(e);
232 }
233 }
234 }
235
236 public void setWaitWhenBlocked(boolean wait) {
237 waitWhenBlocked = wait;
238 if(wait) {
239 executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy());
240 } else {
241 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
242 }
243 }
244
245 public boolean isWaitWhenBlocked() {
246 return waitWhenBlocked;
247 }
248
249 public void doStart() throws Exception {
250 }
251
252 public void doStop() throws Exception {
253 ThreadPoolExecutor p;
254 synchronized (this) {
255 p = executor;
256 executor = null;
257 classLoader = null;
258 }
259 if (p != null) {
260 p.shutdownNow();
261 }
262 }
263
264 public void doFail() {
265 try {
266 doStop();
267 } catch (Exception e) {
268 }
269 }
270
271 private static final class ThreadPoolThreadFactory implements ThreadFactory {
272 private final String poolName;
273 private final ClassLoader classLoader;
274
275 private int nextWorkerID = 0;
276
277 public ThreadPoolThreadFactory(String poolName, ClassLoader classLoader) {
278 this.poolName = poolName;
279 this.classLoader = classLoader;
280 }
281
282 public Thread newThread(Runnable arg0) {
283 Thread thread = new Thread(arg0, poolName + " " + getNextWorkerID());
284 thread.setContextClassLoader(classLoader);
285 return thread;
286 }
287
288 private synchronized int getNextWorkerID() {
289 return nextWorkerID++;
290 }
291 }
292
293 private static final class ContextClassLoaderRunnable implements Runnable {
294 private Runnable task;
295 private ClassLoader classLoader;
296
297 public ContextClassLoaderRunnable(Runnable task, ClassLoader classLoader) {
298 this.task = task;
299 this.classLoader = classLoader;
300 }
301
302 public void run() {
303 Runnable myTask = task;
304 ClassLoader myClassLoader = classLoader;
305
306
307 task = null;
308 classLoader = null;
309
310 if (myClassLoader != null) {
311
312
313 try {
314 myTask.run();
315 } finally {
316 Thread.currentThread().setContextClassLoader(myClassLoader);
317 }
318 }
319 }
320 }
321
322 public static final GBeanInfo GBEAN_INFO;
323
324 static {
325 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class);
326
327 infoFactory.addAttribute("poolSize", int.class, true);
328 infoFactory.addAttribute("poolName", String.class, true);
329 infoFactory.addAttribute("keepAliveTime", long.class, true);
330 infoFactory.addAttribute("waitWhenBlocked", boolean.class, true);
331
332 infoFactory.addAttribute("objectName", String.class, false);
333 infoFactory.addAttribute("classLoader", ClassLoader.class, false);
334
335 infoFactory.addInterface(GeronimoExecutor.class);
336
337 infoFactory.setConstructor(new String[]{"poolSize", "poolName", "keepAliveTime", "classLoader", "objectName"});
338
339 GBEAN_INFO = infoFactory.getBeanInfo();
340 }
341
342 public static GBeanInfo getGBeanInfo() {
343 return GBEAN_INFO;
344 }
345 }