View Javadoc

1   /**
2    *
3    *  Licensed to the Apache Software Foundation (ASF) under one or more
4    *  contributor license agreements.  See the NOTICE file distributed with
5    *  this work for additional information regarding copyright ownership.
6    *  The ASF licenses this file to You under the Apache License, Version 2.0
7    *  (the "License"); you may not use this file except in compliance with
8    *  the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing, software
13   *  distributed under the License is distributed on an "AS IS" BASIS,
14   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *  See the License for the specific language governing permissions and
16   *  limitations under the License.
17   */
18  
19  package org.apache.geronimo.pool;
20  
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.Map;
25  
26  import javax.management.MalformedObjectNameException;
27  import javax.management.ObjectName;
28  import javax.management.j2ee.statistics.BoundedRangeStatistic;
29  import javax.management.j2ee.statistics.CountStatistic;
30  import javax.management.j2ee.statistics.Stats;
31  
32  import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
33  import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
34  import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
35  import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
36  import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
37  import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
38  
39  import org.apache.geronimo.gbean.GBeanInfo;
40  import org.apache.geronimo.gbean.GBeanInfoBuilder;
41  import org.apache.geronimo.gbean.GBeanLifecycle;
42  
43  import org.apache.geronimo.management.J2EEManagedObject;
44  import org.apache.geronimo.management.StatisticsProvider;
45  import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats;
46  import org.apache.geronimo.management.stats.BoundedRangeImpl;
47  import org.apache.geronimo.management.stats.CountStatisticImpl;
48  import org.apache.geronimo.management.stats.StatsImpl;
49  
50  /**
51   * @version $Rev: 470597 $ $Date: 2006-11-02 15:30:55 -0800 (Thu, 02 Nov 2006) $
52   */
53  public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider {
54      private ThreadPoolExecutor executor;
55      private ClassLoader classLoader;
56      private ObjectName objectName;
57      private boolean waitWhenBlocked;
58      
59      // Statistics-related fields follow
60      private boolean statsActive = true;
61      private PoolStatsImpl stats = new PoolStatsImpl();
62      private Map clients = new HashMap();
63  
64      public ThreadPool(int poolSize, String poolName, long keepAliveTime, ClassLoader classLoader, String objectName) {
65          ThreadPoolExecutor p = new ThreadPoolExecutor(
66              poolSize, // core size
67              poolSize, // max size
68              keepAliveTime, TimeUnit.MILLISECONDS,
69              new SynchronousQueue());
70  
71          p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
72          p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader));
73          
74          try {
75              this.objectName = ObjectName.getInstance(objectName);
76          } catch (MalformedObjectNameException e) {
77              throw new IllegalStateException("Bad object name injected: " + e.getMessage());
78          }
79  
80          executor = p;
81          this.classLoader = classLoader;
82      }
83  
84      public String getName() {
85          return objectName.getKeyProperty("name");
86      }
87  
88      public String getObjectName() {
89          return objectName.getCanonicalName();
90      }
91  
92      public boolean isEventProvider() {
93          return true;
94      }
95  
96      public boolean isStateManageable() {
97          return true;
98      }
99  
100     public boolean isStatisticsProvider() {
101         return true;
102     }
103 
104     public Stats getStats() {
105         stats.threadsInUse.setLowerBound(0);
106         stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize());
107         int inUse = executor.getPoolSize();
108         stats.threadsInUse.setCurrent(inUse);
109         if (inUse < stats.threadsInUse.getLowWaterMark()) {
110             stats.threadsInUse.setLowWaterMark(inUse);
111         }
112         if (inUse > stats.threadsInUse.getHighWaterMark()) {
113             stats.threadsInUse.setHighWaterMark(inUse);
114         }
115         if (statsActive) {
116             synchronized (this) {
117                 stats.prepareConsumers(clients);
118             }
119         } else {
120             stats.prepareConsumers(Collections.EMPTY_MAP);
121         }
122         return stats;
123     }
124 
125     public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats {
126         private BoundedRangeImpl threadsInUse = new BoundedRangeImpl("Threads In Use", "", "The number of threads in use by this thread pool");
127         private Map consumers = new HashMap();
128 
129         public PoolStatsImpl() {
130             addStat(threadsInUse.getName(), threadsInUse);
131         }
132 
133         public BoundedRangeStatistic getThreadsInUse() {
134             return threadsInUse;
135         }
136 
137         public CountStatistic getCountForConsumer(String consumer) {
138             return (CountStatistic) consumers.get(consumer);
139         }
140 
141         public String[] getThreadConsumers() {
142             return (String[]) consumers.keySet().toArray(new String[consumers.size()]);
143         }
144 
145         public void prepareConsumers(Map clients) {
146             Map result = new HashMap();
147             for (Iterator it = clients.keySet().iterator(); it.hasNext();) {
148                 String client = (String) it.next();
149                 Integer count = (Integer) clients.get(client);
150                 CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client);
151                 if (stat == null) {
152                     stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue());
153                     addStat(stat.getName(), stat);
154                 } else {
155                     consumers.remove(client);
156                     stat.setCount(count.intValue());
157                 }
158                 result.put(client, stat);
159             }
160             for (Iterator it = consumers.keySet().iterator(); it.hasNext();) {
161                 String client = (String) it.next();
162                 removeStat(((CountStatisticImpl) consumers.get(client)).getName());
163             }
164             consumers = result;
165         }
166     }
167 
168 
169     public int getPoolSize() {
170         return executor.getMaximumPoolSize();
171     }
172 
173     public void execute(Runnable command) {
174         execute("Unknown", command);
175     }
176 
177     public void execute(final String consumerName, final Runnable runnable) {
178         Runnable command;
179         if (statsActive) {
180             command = new Runnable() {
181                 public void run() {
182                     startWork(consumerName);
183                     try {
184                         runnable.run();
185                     } finally {
186                         finishWork(consumerName);
187                     }
188                 }
189             };
190         } else {
191             command = runnable;
192         }
193 
194         ThreadPoolExecutor p;
195         synchronized (this) {
196             p = executor;
197         }
198         if (p == null) {
199             throw new IllegalStateException("ThreadPool has been stopped");
200         }
201         Runnable task = new ContextClassLoaderRunnable(command, classLoader);
202         p.execute(task);
203     }
204 
205     private synchronized void startWork(String consumerName) {
206         Integer test = (Integer) clients.get(consumerName);
207         if (test == null) {
208             clients.put(consumerName, new Integer(1));
209         } else {
210             clients.put(consumerName, new Integer(test.intValue() + 1));
211         }
212     }
213 
214     private synchronized void finishWork(String consumerName) {
215         Integer test = (Integer) clients.get(consumerName);
216         if (test.intValue() == 1) {
217             clients.remove(consumerName);
218         } else {
219             clients.put(consumerName, new Integer(test.intValue() - 1));
220         }
221     }
222     
223     private static class WaitWhenBlockedPolicy
224         implements RejectedExecutionHandler
225     {
226         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) throws RejectedExecutionException {
227             try {
228                 executor.getQueue().put(r);
229             }
230             catch (InterruptedException e) {
231                 throw new RejectedExecutionException(e);
232             }
233         }
234     }
235     
236     public void setWaitWhenBlocked(boolean wait) {
237         waitWhenBlocked = wait;
238         if(wait) {
239             executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy());
240         } else {
241             executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
242         }
243     }
244 
245     public boolean isWaitWhenBlocked() {
246         return waitWhenBlocked;
247     }
248 
249     public void doStart() throws Exception {
250     }
251 
252     public void doStop() throws Exception {
253         ThreadPoolExecutor p;
254         synchronized (this) {
255             p = executor;
256             executor = null;
257             classLoader = null;
258         }
259         if (p != null) {
260             p.shutdownNow();
261         }
262     }
263 
264     public void doFail() {
265         try {
266             doStop();
267         } catch (Exception e) {
268         }
269     }
270 
271     private static final class ThreadPoolThreadFactory implements ThreadFactory {
272         private final String poolName;
273         private final ClassLoader classLoader;
274 
275         private int nextWorkerID = 0;
276 
277         public ThreadPoolThreadFactory(String poolName, ClassLoader classLoader) {
278             this.poolName = poolName;
279             this.classLoader = classLoader;
280         }
281 
282         public Thread newThread(Runnable arg0) {
283             Thread thread = new Thread(arg0, poolName + " " + getNextWorkerID());
284             thread.setContextClassLoader(classLoader);
285             return thread;
286         }
287 
288         private synchronized int getNextWorkerID() {
289             return nextWorkerID++;
290         }
291     }
292 
293     private static final class ContextClassLoaderRunnable implements Runnable {
294         private Runnable task;
295         private ClassLoader classLoader;
296 
297         public ContextClassLoaderRunnable(Runnable task, ClassLoader classLoader) {
298             this.task = task;
299             this.classLoader = classLoader;
300         }
301 
302         public void run() {
303             Runnable myTask = task;
304             ClassLoader myClassLoader = classLoader;
305 
306             // clear fields so they can be garbage collected
307             task = null;
308             classLoader = null;
309 
310             if (myClassLoader != null) {
311                 // we asumme the thread classloader is already set to our final class loader
312                 // because the only to access the thread is wrapped with the Runnable or via the initial thread pool
313                 try {
314                     myTask.run();
315                 } finally {
316                     Thread.currentThread().setContextClassLoader(myClassLoader);
317                 }
318             }
319         }
320     }
321 
322     public static final GBeanInfo GBEAN_INFO;
323 
324     static {
325         GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class);
326 
327         infoFactory.addAttribute("poolSize", int.class, true);
328         infoFactory.addAttribute("poolName", String.class, true);
329         infoFactory.addAttribute("keepAliveTime", long.class, true);
330         infoFactory.addAttribute("waitWhenBlocked", boolean.class, true);
331 
332         infoFactory.addAttribute("objectName", String.class, false);
333         infoFactory.addAttribute("classLoader", ClassLoader.class, false);
334 
335         infoFactory.addInterface(GeronimoExecutor.class);
336 
337         infoFactory.setConstructor(new String[]{"poolSize", "poolName", "keepAliveTime", "classLoader", "objectName"});
338 
339         GBEAN_INFO = infoFactory.getBeanInfo();
340     }
341 
342     public static GBeanInfo getGBeanInfo() {
343         return GBEAN_INFO;
344     }
345 }