001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *  http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing,
013     * software distributed under the License is distributed on an
014     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015     * KIND, either express or implied.  See the License for the
016     * specific language governing permissions and limitations
017     * under the License.
018     */
019    
020    package org.apache.geronimo.gshell.whisper.request;
021    
022    import java.util.HashMap;
023    import java.util.Map;
024    import java.util.concurrent.ScheduledExecutorService;
025    import java.util.concurrent.ScheduledFuture;
026    import java.util.concurrent.ScheduledThreadPoolExecutor;
027    import java.util.concurrent.ThreadFactory;
028    import java.util.concurrent.locks.Lock;
029    import java.util.concurrent.locks.ReentrantLock;
030    
031    import org.apache.geronimo.gshell.common.Duration;
032    import org.apache.geronimo.gshell.common.NamedThreadFactory;
033    import org.apache.geronimo.gshell.common.tostring.ToStringBuilder;
034    import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
035    import org.apache.geronimo.gshell.whisper.message.Message;
036    import org.apache.geronimo.gshell.whisper.session.SessionAttributeBinder;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * Manages request state on a per-session basis, handles timeouts and signalling responses.
042     *
043     * @version $Rev: 580691 $ $Date: 2007-09-30 03:36:37 -0700 (Sun, 30 Sep 2007) $
044     */
045    public class RequestManager
046    {
047        public static final SessionAttributeBinder<RequestManager> BINDER = new SessionAttributeBinder<RequestManager>(RequestManager.class);
048    
049        private final Logger log = LoggerFactory.getLogger(getClass());
050    
051        private final Map<Message.ID,Registration> registrations = new HashMap<Message.ID, Registration>();
052    
053        private final ScheduledExecutorService scheduler;
054    
055        //
056        // TODO: Use a better locking scheme...
057        //
058        
059        private final Lock lock = new ReentrantLock();
060    
061        public RequestManager() {
062            ThreadFactory tf = new NamedThreadFactory(getClass());
063            
064            scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1, tf);
065        }
066    
067        private Registration get(final Message.ID id) {
068            assert id != null;
069    
070            Registration reg = registrations.get(id);
071    
072            if (reg == null) {
073                throw new NotRegisteredException(id);
074            }
075    
076            return reg;
077        }
078    
079        private Registration remove(final Message.ID id) {
080            assert id != null;
081    
082            Registration reg = registrations.remove(id);
083    
084            if (reg == null) {
085                throw new NotRegisteredException(id);
086            }
087    
088            return reg;
089        }
090    
091        public boolean contains(final Message.ID id) {
092            lock.lock();
093    
094            try {
095                return registrations.containsKey(id);
096            }
097            finally {
098                lock.unlock();
099            }
100        }
101    
102        public void register(final RequestHandle request) {
103            assert request != null;
104    
105            lock.lock();
106    
107            try {
108                Message.ID id = request.getId();
109    
110                if (registrations.containsKey(id)) {
111                    throw new DuplicateRegistrationException(id);
112                }
113    
114                Registration reg = new Registration(request);
115    
116                registrations.put(id, reg);
117    
118                log.debug("Registered: {}", reg);
119            }
120            finally {
121                lock.unlock();
122            }
123        }
124    
125        public RequestHandle lookup(final Message.ID id) {
126            assert id != null;
127    
128            lock.lock();
129    
130            try {
131                Registration reg = get(id);
132    
133                return reg.request;
134            }
135            finally {
136                lock.unlock();
137            }
138        }
139    
140        public RequestHandle deregister(final Message.ID id) {
141            assert id != null;
142    
143            lock.lock();
144    
145            try {
146                Registration reg = remove(id);
147    
148                reg.deactivate();
149    
150                log.debug("Deregistered: {}", reg);
151    
152                return reg.request;
153            }
154            finally {
155                lock.unlock();
156            }
157        }
158    
159        public void activate(final Message.ID id) {
160            assert id != null;
161    
162            lock.lock();
163    
164            try {
165                Registration reg = get(id);
166                
167                reg.activate();
168    
169                log.debug("Activated: {}", reg);
170            }
171            catch (NotRegisteredException e) {
172                // Sometimes we receive responses to requests faster than we can register them
173                log.debug("Ignoring activation; request not registered: {}", id);
174            }
175            finally {
176                lock.unlock();
177            }
178        }
179    
180        public void deactivate(final Message.ID id) {
181            assert id != null;
182    
183            lock.lock();
184    
185            try {
186                Registration reg = get(id);
187    
188                reg.deactivate();
189    
190                log.debug("Deactivated: {}", reg);
191            }
192            catch (NotRegisteredException e) {
193                log.debug("Ignoring deactivation; request not registered: {}", id);
194            }
195            finally {
196                lock.unlock();
197            }
198        }
199    
200        private void timeout(final Message.ID id) {
201            assert id != null;
202    
203            lock.lock();
204    
205            try {
206                Registration reg = remove(id);
207    
208                reg.timeout();
209    
210                log.debug("Timed out: {}", reg);
211            }
212            catch (NotRegisteredException e) {
213                log.debug("Ignoring timeout; request not registered: {}", id);
214            }
215            catch (TimeoutAbortedException e) {
216                log.debug("Timeout aborted: " + e.getMessage());
217            }
218            finally {
219                lock.unlock();
220            }
221        }
222    
223        public void close() {
224            lock.lock();
225    
226            try {
227                if (!registrations.isEmpty()) {
228                    log.warn("Timing out remaining {} registrations", registrations.size());
229    
230                    for (Registration reg : registrations.values()) {
231                        timeout(reg.request.getId());
232                    }
233                }
234    
235                //
236                // FIXME: This causes some problems when a rsh client closes, like:
237                //
238                //        java.security.AccessControlException: access denied (java.lang.RuntimePermission modifyThread)
239                //
240                // scheduler.shutdown();
241            }
242            finally {
243                lock.unlock();
244            }
245        }
246    
247        private enum RegistrationState
248        {
249            PENDING,
250            ACTIVE,
251            DEACTIVE,
252            TIMEDOUT
253        }
254    
255        private class Registration
256        {
257            public final RequestHandle request;
258    
259            public RegistrationState state = RegistrationState.PENDING;
260    
261            private ScheduledFuture<?> timeoutFuture;
262    
263            public Registration(final RequestHandle request) {
264                assert request != null;
265    
266                this.request = request;
267            }
268    
269            public void activate() {
270                if (state != RegistrationState.PENDING) {
271                    log.debug("Can not activate, state is not PENDING, found: {}", state);
272                }
273                else {
274                    Runnable task = new Runnable() {
275                        public void run() {
276                            RequestManager.this.timeout(request.getId());
277                        }
278                    };
279    
280                    Duration timeout = request.getTimeout();
281    
282                    log.debug("Scheduling timeout to trigger in: {}", timeout);
283    
284                    timeoutFuture = scheduler.schedule(task, timeout.getValue(), timeout.getUnit());
285    
286                    state = RegistrationState.ACTIVE;
287                }
288            }
289    
290            public void deactivate() {
291                if (state != RegistrationState.ACTIVE) {
292                    log.debug("Can not deactivate; state is not ACTIVE, found: {}", state);
293                }
294                else if (timeoutFuture.cancel(false)) {
295                    timeoutFuture = null;
296    
297                    state = RegistrationState.DEACTIVE;
298                }
299                else {
300                    log.warn("Unable to cancel registration timeout: {}", this);
301                }
302            }
303    
304            public void timeout() {
305                Message.ID id = request.getId();
306    
307                if (timeoutFuture.isCancelled()) {
308                    throw new TimeoutAbortedException("Timeout has been canceled: " + id);
309                }
310                else if (request.isSignaled()) {
311                    throw new TimeoutAbortedException("Request has been singled: " + id);
312                }
313                else {
314                    request.timeout();
315    
316                    state = RegistrationState.TIMEDOUT;
317                }
318            }
319    
320            public String toString() {
321                return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
322                        .append("id", request.getId())
323                        .append("state", state)
324                        .toString();
325            }
326        }
327    
328        public class NotRegisteredException
329            extends RequestException
330        {
331            public NotRegisteredException(final Message.ID id) {
332                super(id);
333            }
334        }
335    
336        public class DuplicateRegistrationException
337            extends RequestException
338        {
339            public DuplicateRegistrationException(final Message.ID id) {
340                super(id);
341            }
342        }
343    
344        public class TimeoutAbortedException
345            extends RequestException
346        {
347            public TimeoutAbortedException(final String msg) {
348                super(msg);
349            }
350        }
351    }