001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.geronimo.console.core.jms;
019
020 import java.util.ArrayList;
021 import java.util.List;
022
023 import javax.jms.Message;
024 import javax.jms.QueueSession;
025 import javax.jms.Topic;
026 import javax.jms.TopicConnection;
027 import javax.jms.TopicConnectionFactory;
028 import javax.jms.TopicSession;
029 import javax.jms.TopicSubscriber;
030 import javax.management.MalformedObjectNameException;
031 import javax.management.ObjectName;
032
033 import org.apache.commons.logging.Log;
034 import org.apache.commons.logging.LogFactory;
035 import org.apache.geronimo.connector.AdminObjectWrapper;
036 import org.apache.geronimo.gbean.GBeanInfo;
037 import org.apache.geronimo.gbean.GBeanInfoBuilder;
038 import org.apache.geronimo.gbean.GBeanLifecycle;
039 import org.apache.geronimo.gbean.WaitingException;
040 import org.apache.geronimo.kernel.Kernel;
041 import org.apache.geronimo.kernel.KernelRegistry;
042 import org.apache.geronimo.kernel.management.State;
043
044 public class TopicBrowserGBean implements GBeanLifecycle, Runnable {
045
046 private static Log log = LogFactory.getLog(TopicBrowserGBean.class);
047
048 private static Kernel kernel = KernelRegistry.getSingleKernel();
049
050 static {
051 try {
052 ACTIVEMQ_CONTAINER_OBJNAME = ObjectName
053 .getInstance("geronimo.server:J2EEApplication=null,J2EEModule=org/apache/geronimo/ActiveMQServer,J2EEServer=geronimo,j2eeType=JMSServer,name=ActiveMQl");
054 ACTIVEMQ_CONNECTOR_OBJNAME = ObjectName
055 .getInstance("geronimo.server:J2EEApplication=null,J2EEServer=geronimo,JCAResource=org/apache/geronimo/SystemJMS,j2eeType=JCAManagedConnectionFactory,name=DefaultActiveMQConnectionFactory");
056 } catch (MalformedObjectNameException moe) {
057 log.warn("Could not initialize ObjectName", moe);
058 }
059 }
060
061 private static ObjectName ACTIVEMQ_CONTAINER_OBJNAME;
062
063 private static ObjectName ACTIVEMQ_CONNECTOR_OBJNAME;
064
065 String subscriberName;
066
067 TopicConnectionFactory tConFactory;
068
069 TopicConnection tConnection;
070
071 AdminObjectWrapper connectionFactoryWrapper, topicWrapper;
072
073 TopicSession tSession;
074
075 TopicSubscriber tSubscriber;
076
077 Topic topic;
078
079 Thread t;
080
081 boolean stop;
082
083 public void run() {
084 try {
085 tConFactory = (TopicConnectionFactory) connectionFactoryWrapper
086 .$getResource();
087 topic = (Topic) topicWrapper.$getResource();
088 tConnection = tConFactory.createTopicConnection();
089 tConnection.setClientID(subscriberName);
090 tSession = tConnection.createTopicSession(false,
091 QueueSession.AUTO_ACKNOWLEDGE);
092 tSubscriber = tSession.createDurableSubscriber(topic,
093 subscriberName);
094 tConnection.start();
095 while (!stop) {
096 Thread.yield();
097 }
098 if (tSession != null) {
099 tSession.close();
100 }
101 if (tConnection != null) {
102 // If the activeMQ connector or container is not running there
103 // is no need to close the connection.
104 // Closing the connection would fail anyway.
105 if (((Integer) kernel.getAttribute(ACTIVEMQ_CONTAINER_OBJNAME,
106 "state")).intValue() == State.RUNNING_INDEX
107 && ((Integer) kernel.getAttribute(
108 ACTIVEMQ_CONNECTOR_OBJNAME, "state"))
109 .intValue() == State.RUNNING_INDEX) {
110 tConnection.close();
111 }
112 }
113 } catch (Exception e) {
114 throw new RuntimeException(e);
115 }
116 t = null;
117 log.debug("Worker thread stopped.");
118 }
119
120 public TopicBrowserGBean(String subscriberName,
121 AdminObjectWrapper connectionFactoryWrapper,
122 AdminObjectWrapper topicWrapper) {
123 this.subscriberName = subscriberName + "@" + this.getClass().getName();
124 this.connectionFactoryWrapper = connectionFactoryWrapper;
125 this.topicWrapper = topicWrapper;
126 }
127
128 /**
129 * Start the connection on a topic and add a durable subscription.
130 *
131 * @see org.apache.geronimo.gbean.GBeanLifecycle#doStart()
132 */
133 public void doStart() throws WaitingException, Exception {
134 t = new Thread(this);
135 t.start();
136 log.debug("Subscribed to topic.");
137 }
138
139 /**
140 * Close the connection and unregister durable subscription.
141 *
142 * @see org.apache.geronimo.gbean.GBeanLifecycle#doStop()
143 */
144 public void doStop() throws WaitingException, Exception {
145 stop = true;
146 log.debug("Unsubscribed to topic.");
147 }
148
149 public void doFail() {
150 stop = true;
151 log.warn("GBean failed.");
152 }
153
154 /**
155 * Get all the messages since the last call to getMessages(). If this is the
156 * first call returns all the messages sent to the Topic
157 *
158 * @return all the messages since the last call to getMessages() or all the
159 * messages sent to the topic if this is there was no previous call.
160 * @throws Exception
161 */
162 public List getMessages() throws Exception {
163 List ret = new ArrayList();
164 Message m = null;
165 do {
166 m = tSubscriber.receiveNoWait();
167 if (m != null) {
168 ret.add(m);
169 }
170 } while (m != null);
171 return ret;
172 }
173
174 /**
175 * Remove a durable subscription.
176 */
177 public void unsubscribe() throws Exception {
178 if (tSubscriber != null) {
179 tSubscriber.close();
180 if (tSession != null) {
181 tSession.unsubscribe(subscriberName);
182 log.debug(subscriberName + " unsubscribed from Topic " + topic.getTopicName() + ".");
183 }
184 }
185 }
186
187 public static final GBeanInfo GBEAN_INFO;
188
189 static {
190 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic("Topic Browser GBean", TopicBrowserGBean.class);
191 infoFactory.addAttribute("subscriberName", String.class, true);
192
193 infoFactory.addReference("ConnectionFactoryWrapper",
194 AdminObjectWrapper.class);
195 infoFactory.addReference("TopicWrapper", AdminObjectWrapper.class);
196
197 infoFactory.addOperation("getMessages");
198 infoFactory.addOperation("unsubscribe");
199
200 infoFactory.setConstructor(new String[] { "subscriberName",
201 "ConnectionFactoryWrapper", "TopicWrapper" });
202
203 GBEAN_INFO = infoFactory.getBeanInfo();
204 }
205
206 public static GBeanInfo getGBeanInfo() {
207 return GBEAN_INFO;
208 }
209
210 }