001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.pool;
019    
020    import java.util.Collections;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.Map;
024    
025    import javax.management.MalformedObjectNameException;
026    import javax.management.ObjectName;
027    import javax.management.j2ee.statistics.BoundedRangeStatistic;
028    import javax.management.j2ee.statistics.CountStatistic;
029    import javax.management.j2ee.statistics.Stats;
030    
031    import java.util.concurrent.ThreadPoolExecutor;
032    import java.util.concurrent.RejectedExecutionHandler;
033    import java.util.concurrent.RejectedExecutionException;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.SynchronousQueue;
036    import java.util.concurrent.TimeUnit;
037    
038    import org.apache.geronimo.gbean.GBeanInfo;
039    import org.apache.geronimo.gbean.GBeanInfoBuilder;
040    import org.apache.geronimo.gbean.GBeanLifecycle;
041    
042    import org.apache.geronimo.management.J2EEManagedObject;
043    import org.apache.geronimo.management.StatisticsProvider;
044    import org.apache.geronimo.management.geronimo.stats.ThreadPoolStats;
045    import org.apache.geronimo.management.stats.BoundedRangeStatisticImpl;
046    import org.apache.geronimo.management.stats.CountStatisticImpl;
047    import org.apache.geronimo.management.stats.StatsImpl;
048    
049    /**
050     * @version $Rev: 706640 $ $Date: 2008-10-21 14:44:05 +0000 (Tue, 21 Oct 2008) $
051     */
052    public class ThreadPool implements GeronimoExecutor, GBeanLifecycle, J2EEManagedObject, StatisticsProvider {
053        private ThreadPoolExecutor executor;
054        private ClassLoader classLoader;
055        private ObjectName objectName;
056        private boolean waitWhenBlocked;
057        
058        // Statistics-related fields follow
059        private boolean statsActive = true;
060        private PoolStatsImpl stats = new PoolStatsImpl();
061        private Map clients = new HashMap();
062    
063        public ThreadPool(int minPoolSize, int maxPoolSize, String poolName, long keepAliveTime, ClassLoader classLoader, String objectName) {
064            ThreadPoolExecutor p = new ThreadPoolExecutor(
065                minPoolSize, // core size
066                maxPoolSize, // max size
067                keepAliveTime, TimeUnit.MILLISECONDS,
068                new SynchronousQueue());
069    
070            p.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
071            p.setThreadFactory(new ThreadPoolThreadFactory(poolName, classLoader));
072            
073            try {
074                this.objectName = ObjectName.getInstance(objectName);
075            } catch (MalformedObjectNameException e) {
076                throw new IllegalStateException("Bad object name injected: " + e.getMessage(), e);
077            }
078    
079            executor = p;
080            this.classLoader = classLoader;
081    
082            // set pool stats start time
083            stats.setStartTime();
084        }
085    
086        public String getName() {
087            return objectName.getKeyProperty("name");
088        }
089    
090        public String getObjectName() {
091            return objectName.getCanonicalName();
092        }
093    
094        public boolean isEventProvider() {
095            return true;
096        }
097    
098        public boolean isStateManageable() {
099            return true;
100        }
101    
102        public boolean isStatisticsProvider() {
103            return true;
104        }
105    
106        public Stats getStats() {
107            stats.threadsInUse.setLowerBound(0);
108            stats.threadsInUse.setUpperBound(executor.getMaximumPoolSize());
109            int inUse = executor.getPoolSize();
110            stats.threadsInUse.setCurrent(inUse);
111            if (inUse < stats.threadsInUse.getLowWaterMark()) {
112                stats.threadsInUse.setLowWaterMark(inUse);
113            }
114            if (inUse > stats.threadsInUse.getHighWaterMark()) {
115                stats.threadsInUse.setHighWaterMark(inUse);
116            }
117            if (statsActive) {
118                synchronized (this) {
119                    stats.prepareConsumers(clients);
120                }
121            } else {
122                stats.prepareConsumers(Collections.EMPTY_MAP);
123            }
124            // set last sapmle time
125            stats.setLastSampleTime();
126            return stats;
127        }
128    
129        /**
130         * Reset all statistics in PoolStatsImpl object
131         */
132        public void resetStats() {
133            stats.threadsInUse.setLowerBound(0);
134            stats.threadsInUse.setUpperBound(0);
135            stats.threadsInUse.setCurrent(0);
136            stats.threadsInUse.setLowWaterMark(0);
137            stats.threadsInUse.setHighWaterMark(0);
138            stats.setStartTime();
139        }
140    
141        public static class PoolStatsImpl extends StatsImpl implements ThreadPoolStats {
142            private BoundedRangeStatisticImpl threadsInUse = new BoundedRangeStatisticImpl(
143                    "Threads In Use", "",
144                    "The number of threads in use by this thread pool");
145            private Map consumers = new HashMap();
146    
147            public PoolStatsImpl() {
148                addStat(threadsInUse.getName(), threadsInUse);
149            }
150    
151            public BoundedRangeStatistic getThreadsInUse() {
152                return threadsInUse;
153            }
154    
155            public CountStatistic getCountForConsumer(String consumer) {
156                return (CountStatistic) consumers.get(consumer);
157            }
158    
159            public String[] getThreadConsumers() {
160                return (String[]) consumers.keySet().toArray(new String[consumers.size()]);
161            }
162    
163            public void prepareConsumers(Map clients) {
164                Map result = new HashMap();
165                for (Iterator it = clients.keySet().iterator(); it.hasNext();) {
166                    String client = (String) it.next();
167                    Integer count = (Integer) clients.get(client);
168                    CountStatisticImpl stat = (CountStatisticImpl) consumers.get(client);
169                    if (stat == null) {
170                        stat = new CountStatisticImpl("Threads for " + client, "", "The number of threads used by the client known as '" + client + "'", count.intValue());
171                        addStat(stat.getName(), stat);
172                    } else {
173                        consumers.remove(client);
174                        stat.setCount(count.intValue());
175                    }
176                    result.put(client, stat);
177                }
178                for (Iterator it = consumers.keySet().iterator(); it.hasNext();) {
179                    String client = (String) it.next();
180                    removeStat(((CountStatisticImpl) consumers.get(client)).getName());
181                }
182                consumers = result;
183            }
184        }
185    
186    
187        public int getPoolSize() {
188            return executor.getPoolSize();
189        }
190    
191        public int getMaximumPoolSize() {
192            return executor.getMaximumPoolSize();
193        }
194    
195        public int getActiveCount() {
196            return executor.getActiveCount();
197        }
198    
199        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
200            return executor.awaitTermination(timeout, unit);
201        }
202    
203        public void execute(Runnable command) {
204            execute("Unknown", command);
205        }
206    
207        public void execute(final String consumerName, final Runnable runnable) {
208            Runnable command;
209            if (statsActive) {
210                command = new Runnable() {
211                    public void run() {
212                        startWork(consumerName);
213                        try {
214                            runnable.run();
215                        } finally {
216                            finishWork(consumerName);
217                        }
218                    }
219                };
220            } else {
221                command = runnable;
222            }
223    
224            ThreadPoolExecutor p;
225            synchronized (this) {
226                p = executor;
227            }
228            if (p == null) {
229                throw new IllegalStateException("ThreadPool has been stopped");
230            }
231            Runnable task = new ContextClassLoaderRunnable(command, classLoader);
232            p.execute(task);
233        }
234    
235        private synchronized void startWork(String consumerName) {
236            Integer test = (Integer) clients.get(consumerName);
237            if (test == null) {
238                clients.put(consumerName, new Integer(1));
239            } else {
240                clients.put(consumerName, new Integer(test.intValue() + 1));
241            }
242        }
243    
244        private synchronized void finishWork(String consumerName) {
245            Integer test = (Integer) clients.get(consumerName);
246            if (test.intValue() == 1) {
247                clients.remove(consumerName);
248            } else {
249                clients.put(consumerName, new Integer(test.intValue() - 1));
250            }
251        }
252        
253        private static class WaitWhenBlockedPolicy
254            implements RejectedExecutionHandler
255        {
256            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) throws RejectedExecutionException {
257                try {
258                    executor.getQueue().put(r);
259                }
260                catch (InterruptedException e) {
261                    throw new RejectedExecutionException(e);
262                }
263            }
264        }
265        
266        public void setWaitWhenBlocked(boolean wait) {
267            waitWhenBlocked = wait;
268            if(wait) {
269                executor.setRejectedExecutionHandler(new WaitWhenBlockedPolicy());
270            } else {
271                executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
272            }
273        }
274    
275        public boolean isWaitWhenBlocked() {
276            return waitWhenBlocked;
277        }
278    
279        public void doStart() throws Exception {
280        }
281    
282        public void doStop() throws Exception {
283            ThreadPoolExecutor p;
284            synchronized (this) {
285                p = executor;
286                executor = null;
287                classLoader = null;
288            }
289            if (p != null) {
290                p.shutdownNow();
291            }
292        }
293    
294        public void doFail() {
295            try {
296                doStop();
297            } catch (Exception e) {
298            }
299        }
300    
301        private static final class ThreadPoolThreadFactory implements ThreadFactory {
302            private final String poolName;
303            private final ClassLoader classLoader;
304    
305            private int nextWorkerID = 0;
306    
307            public ThreadPoolThreadFactory(String poolName, ClassLoader classLoader) {
308                this.poolName = poolName;
309                this.classLoader = classLoader;
310            }
311    
312            public Thread newThread(Runnable arg0) {
313                Thread thread = new Thread(arg0, poolName + " " + getNextWorkerID());
314                thread.setContextClassLoader(classLoader);
315                return thread;
316            }
317    
318            private synchronized int getNextWorkerID() {
319                return nextWorkerID++;
320            }
321        }
322    
323        private static final class ContextClassLoaderRunnable implements Runnable {
324            private Runnable task;
325            private ClassLoader classLoader;
326    
327            public ContextClassLoaderRunnable(Runnable task, ClassLoader classLoader) {
328                this.task = task;
329                this.classLoader = classLoader;
330            }
331    
332            public void run() {
333                Runnable myTask = task;
334                ClassLoader myClassLoader = classLoader;
335    
336                // clear fields so they can be garbage collected
337                task = null;
338                classLoader = null;
339    
340                if (myClassLoader != null) {
341                    // we asumme the thread classloader is already set to our final class loader
342                    // because the only to access the thread is wrapped with the Runnable or via the initial thread pool
343                    try {
344                        myTask.run();
345                    } finally {
346                        Thread.currentThread().setContextClassLoader(myClassLoader);
347                    }
348                }
349            }
350        }
351    
352        public static final GBeanInfo GBEAN_INFO;
353    
354        static {
355            GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic(ThreadPool.class, "GBean");
356    
357            infoFactory.addAttribute("minPoolSize", int.class, true);
358            infoFactory.addAttribute("maxPoolSize", int.class, true);
359            infoFactory.addAttribute("poolName", String.class, true);
360            infoFactory.addAttribute("keepAliveTime", long.class, true);
361            infoFactory.addAttribute("waitWhenBlocked", boolean.class, true);
362    
363            infoFactory.addAttribute("objectName", String.class, false);
364            infoFactory.addAttribute("classLoader", ClassLoader.class, false);
365    
366            infoFactory.addInterface(GeronimoExecutor.class);
367    
368            infoFactory.setConstructor(new String[]{"minPoolSize", "maxPoolSize", "poolName", "keepAliveTime", "classLoader", "objectName"});
369    
370            GBEAN_INFO = infoFactory.getBeanInfo();
371        }
372    
373        public static GBeanInfo getGBeanInfo() {
374            return GBEAN_INFO;
375        }
376    }