001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018 package org.apache.geronimo.console.core.jms; 019 020 import java.util.ArrayList; 021 import java.util.List; 022 023 import javax.jms.Message; 024 import javax.jms.QueueSession; 025 import javax.jms.Topic; 026 import javax.jms.TopicConnection; 027 import javax.jms.TopicConnectionFactory; 028 import javax.jms.TopicSession; 029 import javax.jms.TopicSubscriber; 030 import javax.management.MalformedObjectNameException; 031 import javax.management.ObjectName; 032 033 import org.apache.commons.logging.Log; 034 import org.apache.commons.logging.LogFactory; 035 import org.apache.geronimo.connector.AdminObjectWrapper; 036 import org.apache.geronimo.gbean.GBeanInfo; 037 import org.apache.geronimo.gbean.GBeanInfoBuilder; 038 import org.apache.geronimo.gbean.GBeanLifecycle; 039 import org.apache.geronimo.gbean.WaitingException; 040 import org.apache.geronimo.kernel.Kernel; 041 import org.apache.geronimo.kernel.KernelRegistry; 042 import org.apache.geronimo.kernel.management.State; 043 044 public class TopicBrowserGBean implements GBeanLifecycle, Runnable { 045 046 private static Log log = LogFactory.getLog(TopicBrowserGBean.class); 047 048 private static Kernel kernel = KernelRegistry.getSingleKernel(); 049 050 static { 051 try { 052 ACTIVEMQ_CONTAINER_OBJNAME = ObjectName 053 .getInstance("geronimo.server:J2EEApplication=null,J2EEModule=org/apache/geronimo/ActiveMQServer,J2EEServer=geronimo,j2eeType=JMSServer,name=ActiveMQl"); 054 ACTIVEMQ_CONNECTOR_OBJNAME = ObjectName 055 .getInstance("geronimo.server:J2EEApplication=null,J2EEServer=geronimo,JCAResource=org/apache/geronimo/SystemJMS,j2eeType=JCAManagedConnectionFactory,name=DefaultActiveMQConnectionFactory"); 056 } catch (MalformedObjectNameException moe) { 057 log.warn("Could not initialize ObjectName", moe); 058 } 059 } 060 061 private static ObjectName ACTIVEMQ_CONTAINER_OBJNAME; 062 063 private static ObjectName ACTIVEMQ_CONNECTOR_OBJNAME; 064 065 String subscriberName; 066 067 TopicConnectionFactory tConFactory; 068 069 TopicConnection tConnection; 070 071 AdminObjectWrapper connectionFactoryWrapper, topicWrapper; 072 073 TopicSession tSession; 074 075 TopicSubscriber tSubscriber; 076 077 Topic topic; 078 079 Thread t; 080 081 boolean stop; 082 083 public void run() { 084 try { 085 tConFactory = (TopicConnectionFactory) connectionFactoryWrapper 086 .$getResource(); 087 topic = (Topic) topicWrapper.$getResource(); 088 tConnection = tConFactory.createTopicConnection(); 089 tConnection.setClientID(subscriberName); 090 tSession = tConnection.createTopicSession(false, 091 QueueSession.AUTO_ACKNOWLEDGE); 092 tSubscriber = tSession.createDurableSubscriber(topic, 093 subscriberName); 094 tConnection.start(); 095 while (!stop) { 096 Thread.yield(); 097 } 098 if (tSession != null) { 099 tSession.close(); 100 } 101 if (tConnection != null) { 102 // If the activeMQ connector or container is not running there 103 // is no need to close the connection. 104 // Closing the connection would fail anyway. 105 if (((Integer) kernel.getAttribute(ACTIVEMQ_CONTAINER_OBJNAME, 106 "state")).intValue() == State.RUNNING_INDEX 107 && ((Integer) kernel.getAttribute( 108 ACTIVEMQ_CONNECTOR_OBJNAME, "state")) 109 .intValue() == State.RUNNING_INDEX) { 110 tConnection.close(); 111 } 112 } 113 } catch (Exception e) { 114 throw new RuntimeException(e); 115 } 116 t = null; 117 log.debug("Worker thread stopped."); 118 } 119 120 public TopicBrowserGBean(String subscriberName, 121 AdminObjectWrapper connectionFactoryWrapper, 122 AdminObjectWrapper topicWrapper) { 123 this.subscriberName = subscriberName + "@" + this.getClass().getName(); 124 this.connectionFactoryWrapper = connectionFactoryWrapper; 125 this.topicWrapper = topicWrapper; 126 } 127 128 /** 129 * Start the connection on a topic and add a durable subscription. 130 * 131 * @see org.apache.geronimo.gbean.GBeanLifecycle#doStart() 132 */ 133 public void doStart() throws WaitingException, Exception { 134 t = new Thread(this); 135 t.start(); 136 log.debug("Subscribed to topic."); 137 } 138 139 /** 140 * Close the connection and unregister durable subscription. 141 * 142 * @see org.apache.geronimo.gbean.GBeanLifecycle#doStop() 143 */ 144 public void doStop() throws WaitingException, Exception { 145 stop = true; 146 log.debug("Unsubscribed to topic."); 147 } 148 149 public void doFail() { 150 stop = true; 151 log.warn("GBean failed."); 152 } 153 154 /** 155 * Get all the messages since the last call to getMessages(). If this is the 156 * first call returns all the messages sent to the Topic 157 * 158 * @return all the messages since the last call to getMessages() or all the 159 * messages sent to the topic if this is there was no previous call. 160 * @throws Exception 161 */ 162 public List getMessages() throws Exception { 163 List ret = new ArrayList(); 164 Message m = null; 165 do { 166 m = tSubscriber.receiveNoWait(); 167 if (m != null) { 168 ret.add(m); 169 } 170 } while (m != null); 171 return ret; 172 } 173 174 /** 175 * Remove a durable subscription. 176 */ 177 public void unsubscribe() throws Exception { 178 if (tSubscriber != null) { 179 tSubscriber.close(); 180 if (tSession != null) { 181 tSession.unsubscribe(subscriberName); 182 log.debug(subscriberName + " unsubscribed from Topic " + topic.getTopicName() + "."); 183 } 184 } 185 } 186 187 public static final GBeanInfo GBEAN_INFO; 188 189 static { 190 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic("Topic Browser GBean", TopicBrowserGBean.class); 191 infoFactory.addAttribute("subscriberName", String.class, true); 192 193 infoFactory.addReference("ConnectionFactoryWrapper", 194 AdminObjectWrapper.class); 195 infoFactory.addReference("TopicWrapper", AdminObjectWrapper.class); 196 197 infoFactory.addOperation("getMessages"); 198 infoFactory.addOperation("unsubscribe"); 199 200 infoFactory.setConstructor(new String[] { "subscriberName", 201 "ConnectionFactoryWrapper", "TopicWrapper" }); 202 203 GBEAN_INFO = infoFactory.getBeanInfo(); 204 } 205 206 public static GBeanInfo getGBeanInfo() { 207 return GBEAN_INFO; 208 } 209 210 }