001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.pool;
019    
020    import java.util.Collections;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import javax.management.MalformedObjectNameException;
026    import javax.management.ObjectName;
027    import javax.management.j2ee.statistics.BoundedRangeStatistic;
028    import javax.management.j2ee.statistics.CountStatistic;
029    import javax.management.j2ee.statistics.Stats;
030    
031    import java.util.concurrent.ThreadPoolExecutor;
032    import java.util.concurrent.RejectedExecutionHandler;
033    import java.util.concurrent.RejectedExecutionException;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.SynchronousQueue;
036    import java.util.concurrent.TimeUnit;
037    
038    import org.apache.geronimo.gbean.GBeanInfo;
039    import org.apache.geronimo.gbean.GBeanInfoBuilder;
040    import org.apache.geronimo.gbean.GBeanLifecycle;
041    
042    import org.apache.geronimo.management.J2EEManagedObject;
043    import org.apache.geronimo.management.StatisticsProvider;
044    import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats;
045    import org.apache.geronimo.management.stats.BoundedRangeStatisticImpl;
046    import org.apache.geronimo.management.stats.CountStatisticImpl;
047    import org.apache.geronimo.management.stats.StatsImpl;
048    
049    /**
050     * @version $Rev: 549455 $ $Date: 2007-06-21 08:12:27 -0400 (Thu, 21 Jun 2007) $
051     */
052    public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider {
053        private ThreadPoolExecutor executor;
054        private ClassLoader classLoader;
055        private ObjectName objectName;
056        private boolean waitWhenBlocked;
057        
058        // Statistics-related fields follow
059        private boolean statsActive = true;
060        private PoolStatsImpl stats = new PoolStatsImpl();
061        private Map clients = new HashMap();
062    
063        public ThreadPool(int poolSize, String poolName, long keepAliveTime, ClassLoader classLoader, String objectName) {
064            ThreadPoolExecutor p = new ThreadPoolExecutor(
065                poolSize, // core size
066                poolSize, // max size
067                keepAliveTime, TimeUnit.MILLISECONDS,
068                new SynchronousQueue());
069    
070            p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
071            p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader));
072            
073            try {
074                this.objectName = ObjectName.getInstance(objectName);
075            } catch (MalformedObjectNameException e) {
076                throw new IllegalStateException("Bad object name injected: " + e.getMessage(), e);
077            }
078    
079            executor = p;
080            this.classLoader = classLoader;
081        }
082    
083        public String getName() {
084            return objectName.getKeyProperty("name");
085        }
086    
087        public String getObjectName() {
088            return objectName.getCanonicalName();
089        }
090    
091        public boolean isEventProvider() {
092            return true;
093        }
094    
095        public boolean isStateManageable() {
096            return true;
097        }
098    
099        public boolean isStatisticsProvider() {
100            return true;
101        }
102    
103        public Stats getStats() {
104            stats.threadsInUse.setLowerBound(0);
105            stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize());
106            int inUse = executor.getPoolSize();
107            stats.threadsInUse.setCurrent(inUse);
108            if (inUse < stats.threadsInUse.getLowWaterMark()) {
109                stats.threadsInUse.setLowWaterMark(inUse);
110            }
111            if (inUse > stats.threadsInUse.getHighWaterMark()) {
112                stats.threadsInUse.setHighWaterMark(inUse);
113            }
114            if (statsActive) {
115                synchronized (this) {
116                    stats.prepareConsumers(clients);
117                }
118            } else {
119                stats.prepareConsumers(Collections.EMPTY_MAP);
120            }
121            return stats;
122        }
123    
124        public void resetStats() {
125            // TODO
126        }
127    
128        public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats {
129            private BoundedRangeStatisticImpl threadsInUse = new BoundedRangeStatisticImpl(
130                    "Threads In Use", "",
131                    "The number of threads in use by this thread pool");
132            private Map consumers = new HashMap();
133    
134            public PoolStatsImpl() {
135                addStat(threadsInUse.getName(), threadsInUse);
136            }
137    
138            public BoundedRangeStatistic getThreadsInUse() {
139                return threadsInUse;
140            }
141    
142            public CountStatistic getCountForConsumer(String consumer) {
143                return (CountStatistic) consumers.get(consumer);
144            }
145    
146            public String[] getThreadConsumers() {
147                return (String[]) consumers.keySet().toArray(new String[consumers.size()]);
148            }
149    
150            public void prepareConsumers(Map clients) {
151                Map result = new HashMap();
152                for (Iterator it = clients.keySet().iterator(); it.hasNext();) {
153                    String client = (String) it.next();
154                    Integer count = (Integer) clients.get(client);
155                    CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client);
156                    if (stat == null) {
157                        stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue());
158                        addStat(stat.getName(), stat);
159                    } else {
160                        consumers.remove(client);
161                        stat.setCount(count.intValue());
162                    }
163                    result.put(client, stat);
164                }
165                for (Iterator it = consumers.keySet().iterator(); it.hasNext();) {
166                    String client = (String) it.next();
167                    removeStat(((CountStatisticImpl) consumers.get(client)).getName());
168                }
169                consumers = result;
170            }
171        }
172    
173    
174        public int getPoolSize() {
175            return executor.getPoolSize();
176        }
177    
178        public int getMaximumPoolSize() {
179            return executor.getMaximumPoolSize();
180        }
181    
182        public int getActiveCount() {
183            return executor.getActiveCount();
184        }
185    
186        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
187            return executor.awaitTermination(timeout, unit);
188        }
189    
190        public void execute(Runnable command) {
191            execute("Unknown", command);
192        }
193    
194        public void execute(final String consumerName, final Runnable runnable) {
195            Runnable command;
196            if (statsActive) {
197                command = new Runnable() {
198                    public void run() {
199                        startWork(consumerName);
200                        try {
201                            runnable.run();
202                        } finally {
203                            finishWork(consumerName);
204                        }
205                    }
206                };
207            } else {
208                command = runnable;
209            }
210    
211            ThreadPoolExecutor p;
212            synchronized (this) {
213                p = executor;
214            }
215            if (p == null) {
216                throw new IllegalStateException("ThreadPool has been stopped");
217            }
218            Runnable task = new ContextClassLoaderRunnable(command, classLoader);
219            p.execute(task);
220        }
221    
222        private synchronized void startWork(String consumerName) {
223            Integer test = (Integer) clients.get(consumerName);
224            if (test == null) {
225                clients.put(consumerName, new Integer(1));
226            } else {
227                clients.put(consumerName, new Integer(test.intValue() + 1));
228            }
229        }
230    
231        private synchronized void finishWork(String consumerName) {
232            Integer test = (Integer) clients.get(consumerName);
233            if (test.intValue() == 1) {
234                clients.remove(consumerName);
235            } else {
236                clients.put(consumerName, new Integer(test.intValue() - 1));
237            }
238        }
239        
240        private static class WaitWhenBlockedPolicy
241            implements RejectedExecutionHandler
242        {
243            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) throws RejectedExecutionException {
244                try {
245                    executor.getQueue().put(r);
246                }
247                catch (InterruptedException e) {
248                    throw new RejectedExecutionException(e);
249                }
250            }
251        }
252        
253        public void setWaitWhenBlocked(boolean wait) {
254            waitWhenBlocked = wait;
255            if(wait) {
256                executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy());
257            } else {
258                executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
259            }
260        }
261    
262        public boolean isWaitWhenBlocked() {
263            return waitWhenBlocked;
264        }
265    
266        public void doStart() throws Exception {
267        }
268    
269        public void doStop() throws Exception {
270            ThreadPoolExecutor p;
271            synchronized (this) {
272                p = executor;
273                executor = null;
274                classLoader = null;
275            }
276            if (p != null) {
277                p.shutdownNow();
278            }
279        }
280    
281        public void doFail() {
282            try {
283                doStop();
284            } catch (Exception e) {
285            }
286        }
287    
288        private static final class ThreadPoolThreadFactory implements ThreadFactory {
289            private final String poolName;
290            private final ClassLoader classLoader;
291    
292            private int nextWorkerID = 0;
293    
294            public ThreadPoolThreadFactory(String poolName, ClassLoader classLoader) {
295                this.poolName = poolName;
296                this.classLoader = classLoader;
297            }
298    
299            public Thread newThread(Runnable arg0) {
300                Thread thread = new Thread(arg0, poolName + " " + getNextWorkerID());
301                thread.setContextClassLoader(classLoader);
302                return thread;
303            }
304    
305            private synchronized int getNextWorkerID() {
306                return nextWorkerID++;
307            }
308        }
309    
310        private static final class ContextClassLoaderRunnable implements Runnable {
311            private Runnable task;
312            private ClassLoader classLoader;
313    
314            public ContextClassLoaderRunnable(Runnable task, ClassLoader classLoader) {
315                this.task = task;
316                this.classLoader = classLoader;
317            }
318    
319            public void run() {
320                Runnable myTask = task;
321                ClassLoader myClassLoader = classLoader;
322    
323                // clear fields so they can be garbage collected
324                task = null;
325                classLoader = null;
326    
327                if (myClassLoader != null) {
328                    // we asumme the thread classloader is already set to our final class loader
329                    // because the only to access the thread is wrapped with the Runnable or via the initial thread pool
330                    try {
331                        myTask.run();
332                    } finally {
333                        Thread.currentThread().setContextClassLoader(myClassLoader);
334                    }
335                }
336            }
337        }
338    
339        public static final GBeanInfo GBEAN_INFO;
340    
341        static {
342            GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class, "GBean");
343    
344            infoFactory.addAttribute("poolSize", int.class, true);
345            infoFactory.addAttribute("poolName", String.class, true);
346            infoFactory.addAttribute("keepAliveTime", long.class, true);
347            infoFactory.addAttribute("waitWhenBlocked", boolean.class, true);
348    
349            infoFactory.addAttribute("objectName", String.class, false);
350            infoFactory.addAttribute("classLoader", ClassLoader.class, false);
351    
352            infoFactory.addInterface(GeronimoExecutor.class);
353    
354            infoFactory.setConstructor(new String[]{"poolSize", "poolName", "keepAliveTime", "classLoader", "objectName"});
355    
356            GBEAN_INFO = infoFactory.getBeanInfo();
357        }
358    
359        public static GBeanInfo getGBeanInfo() {
360            return GBEAN_INFO;
361        }
362    }