001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.console.core.jms;
019    
020    import java.util.ArrayList;
021    import java.util.List;
022    
023    import javax.jms.Message;
024    import javax.jms.QueueSession;
025    import javax.jms.Topic;
026    import javax.jms.TopicConnection;
027    import javax.jms.TopicConnectionFactory;
028    import javax.jms.TopicSession;
029    import javax.jms.TopicSubscriber;
030    import javax.management.MalformedObjectNameException;
031    import javax.management.ObjectName;
032    
033    import org.apache.commons.logging.Log;
034    import org.apache.commons.logging.LogFactory;
035    import org.apache.geronimo.connector.AdminObjectWrapper;
036    import org.apache.geronimo.gbean.GBeanInfo;
037    import org.apache.geronimo.gbean.GBeanInfoBuilder;
038    import org.apache.geronimo.gbean.GBeanLifecycle;
039    import org.apache.geronimo.gbean.WaitingException;
040    import org.apache.geronimo.kernel.Kernel;
041    import org.apache.geronimo.kernel.KernelRegistry;
042    import org.apache.geronimo.kernel.management.State;
043    
044    public class TopicBrowserGBean implements GBeanLifecycle, Runnable {
045    
046        private static Log log = LogFactory.getLog(TopicBrowserGBean.class);
047    
048        private static Kernel kernel = KernelRegistry.getSingleKernel();
049    
050        static {
051            try {
052                ACTIVEMQ_CONTAINER_OBJNAME = ObjectName
053                        .getInstance("geronimo.server:J2EEApplication=null,J2EEModule=org/apache/geronimo/ActiveMQServer,J2EEServer=geronimo,j2eeType=JMSServer,name=ActiveMQl");
054                ACTIVEMQ_CONNECTOR_OBJNAME = ObjectName
055                        .getInstance("geronimo.server:J2EEApplication=null,J2EEServer=geronimo,JCAResource=org/apache/geronimo/SystemJMS,j2eeType=JCAManagedConnectionFactory,name=DefaultActiveMQConnectionFactory");
056            } catch (MalformedObjectNameException moe) {
057                log.warn("Could not initialize ObjectName", moe);
058            }
059        }
060    
061        private static ObjectName ACTIVEMQ_CONTAINER_OBJNAME;
062    
063        private static ObjectName ACTIVEMQ_CONNECTOR_OBJNAME;
064    
065        String subscriberName;
066    
067        TopicConnectionFactory tConFactory;
068    
069        TopicConnection tConnection;
070    
071        AdminObjectWrapper connectionFactoryWrapper, topicWrapper;
072    
073        TopicSession tSession;
074    
075        TopicSubscriber tSubscriber;
076    
077        Topic topic;
078    
079        Thread t;
080    
081        boolean stop;
082    
083        public void run() {
084            try {
085                tConFactory = (TopicConnectionFactory) connectionFactoryWrapper
086                        .$getResource();
087                topic = (Topic) topicWrapper.$getResource();
088                tConnection = tConFactory.createTopicConnection();
089                tConnection.setClientID(subscriberName);
090                tSession = tConnection.createTopicSession(false,
091                        QueueSession.AUTO_ACKNOWLEDGE);
092                tSubscriber = tSession.createDurableSubscriber(topic,
093                        subscriberName);
094                tConnection.start();
095                while (!stop) {
096                    Thread.yield();
097                }
098                if (tSession != null) {
099                    tSession.close();
100                }
101                if (tConnection != null) {
102                    // If the activeMQ connector or container is not running there
103                    // is no need to close the connection.
104                    // Closing the connection would fail anyway.
105                    if (((Integer) kernel.getAttribute(ACTIVEMQ_CONTAINER_OBJNAME,
106                            "state")).intValue() == State.RUNNING_INDEX
107                            && ((Integer) kernel.getAttribute(
108                                    ACTIVEMQ_CONNECTOR_OBJNAME, "state"))
109                                    .intValue() == State.RUNNING_INDEX) {
110                        tConnection.close();
111                    }
112                }
113            } catch (Exception e) {
114                throw new RuntimeException(e);
115            }
116            t = null;
117            log.debug("Worker thread stopped.");
118        }
119    
120        public TopicBrowserGBean(String subscriberName,
121                AdminObjectWrapper connectionFactoryWrapper,
122                AdminObjectWrapper topicWrapper) {
123            this.subscriberName = subscriberName + "@" + this.getClass().getName();
124            this.connectionFactoryWrapper = connectionFactoryWrapper;
125            this.topicWrapper = topicWrapper;
126        }
127    
128        /**
129         * Start the connection on a topic and add a durable subscription.
130         *
131         * @see org.apache.geronimo.gbean.GBeanLifecycle#doStart()
132         */
133        public void doStart() throws WaitingException, Exception {
134            t = new Thread(this);
135            t.start();
136            log.debug("Subscribed to topic.");
137        }
138    
139        /**
140         * Close the connection and unregister durable subscription.
141         *
142         * @see org.apache.geronimo.gbean.GBeanLifecycle#doStop()
143         */
144        public void doStop() throws WaitingException, Exception {
145            stop = true;
146            log.debug("Unsubscribed to topic.");
147        }
148    
149        public void doFail() {
150            stop = true;
151            log.warn("GBean failed.");
152        }
153    
154        /**
155         * Get all the messages since the last call to getMessages(). If this is the
156         * first call returns all the messages sent to the Topic
157         *
158         * @return all the messages since the last call to getMessages() or all the
159         *         messages sent to the topic if this is there was no previous call.
160         * @throws Exception
161         */
162        public List getMessages() throws Exception {
163            List ret = new ArrayList();
164            Message m = null;
165            do {
166                m = tSubscriber.receiveNoWait();
167                if (m != null) {
168                    ret.add(m);
169                }
170            } while (m != null);
171            return ret;
172        }
173    
174        /**
175         * Remove a durable subscription.
176         */
177        public void unsubscribe() throws Exception {
178            if (tSubscriber != null) {
179                tSubscriber.close();
180                if (tSession != null) {
181                    tSession.unsubscribe(subscriberName);
182                    log.debug(subscriberName + " unsubscribed from Topic " + topic.getTopicName() + ".");
183                }
184            }
185        }
186    
187        public static final GBeanInfo GBEAN_INFO;
188    
189        static {
190            GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic("Topic Browser GBean", TopicBrowserGBean.class);
191            infoFactory.addAttribute("subscriberName", String.class, true);
192    
193            infoFactory.addReference("ConnectionFactoryWrapper",
194                    AdminObjectWrapper.class);
195            infoFactory.addReference("TopicWrapper", AdminObjectWrapper.class);
196    
197            infoFactory.addOperation("getMessages");
198            infoFactory.addOperation("unsubscribe");
199    
200            infoFactory.setConstructor(new String[] { "subscriberName",
201                    "ConnectionFactoryWrapper", "TopicWrapper" });
202    
203            GBEAN_INFO = infoFactory.getBeanInfo();
204        }
205    
206        public static GBeanInfo getGBeanInfo() {
207            return GBEAN_INFO;
208        }
209    
210    }