001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019
020 package org.apache.geronimo.gshell.whisper.request;
021
022 import java.util.HashMap;
023 import java.util.Map;
024 import java.util.concurrent.ScheduledExecutorService;
025 import java.util.concurrent.ScheduledFuture;
026 import java.util.concurrent.ScheduledThreadPoolExecutor;
027 import java.util.concurrent.ThreadFactory;
028 import java.util.concurrent.locks.Lock;
029 import java.util.concurrent.locks.ReentrantLock;
030
031 import org.apache.geronimo.gshell.common.Duration;
032 import org.apache.geronimo.gshell.common.NamedThreadFactory;
033 import org.apache.geronimo.gshell.common.tostring.ToStringBuilder;
034 import org.apache.geronimo.gshell.common.tostring.ToStringStyle;
035 import org.apache.geronimo.gshell.whisper.message.Message;
036 import org.apache.geronimo.gshell.whisper.session.SessionAttributeBinder;
037 import org.slf4j.Logger;
038 import org.slf4j.LoggerFactory;
039
040 /**
041 * Manages request state on a per-session basis, handles timeouts and signalling responses.
042 *
043 * @version $Rev: 580691 $ $Date: 2007-09-30 03:36:37 -0700 (Sun, 30 Sep 2007) $
044 */
045 public class RequestManager
046 {
047 public static final SessionAttributeBinder<RequestManager> BINDER = new SessionAttributeBinder<RequestManager>(RequestManager.class);
048
049 private final Logger log = LoggerFactory.getLogger(getClass());
050
051 private final Map<Message.ID,Registration> registrations = new HashMap<Message.ID, Registration>();
052
053 private final ScheduledExecutorService scheduler;
054
055 //
056 // TODO: Use a better locking scheme...
057 //
058
059 private final Lock lock = new ReentrantLock();
060
061 public RequestManager() {
062 ThreadFactory tf = new NamedThreadFactory(getClass());
063
064 scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1, tf);
065 }
066
067 private Registration get(final Message.ID id) {
068 assert id != null;
069
070 Registration reg = registrations.get(id);
071
072 if (reg == null) {
073 throw new NotRegisteredException(id);
074 }
075
076 return reg;
077 }
078
079 private Registration remove(final Message.ID id) {
080 assert id != null;
081
082 Registration reg = registrations.remove(id);
083
084 if (reg == null) {
085 throw new NotRegisteredException(id);
086 }
087
088 return reg;
089 }
090
091 public boolean contains(final Message.ID id) {
092 lock.lock();
093
094 try {
095 return registrations.containsKey(id);
096 }
097 finally {
098 lock.unlock();
099 }
100 }
101
102 public void register(final RequestHandle request) {
103 assert request != null;
104
105 lock.lock();
106
107 try {
108 Message.ID id = request.getId();
109
110 if (registrations.containsKey(id)) {
111 throw new DuplicateRegistrationException(id);
112 }
113
114 Registration reg = new Registration(request);
115
116 registrations.put(id, reg);
117
118 log.debug("Registered: {}", reg);
119 }
120 finally {
121 lock.unlock();
122 }
123 }
124
125 public RequestHandle lookup(final Message.ID id) {
126 assert id != null;
127
128 lock.lock();
129
130 try {
131 Registration reg = get(id);
132
133 return reg.request;
134 }
135 finally {
136 lock.unlock();
137 }
138 }
139
140 public RequestHandle deregister(final Message.ID id) {
141 assert id != null;
142
143 lock.lock();
144
145 try {
146 Registration reg = remove(id);
147
148 reg.deactivate();
149
150 log.debug("Deregistered: {}", reg);
151
152 return reg.request;
153 }
154 finally {
155 lock.unlock();
156 }
157 }
158
159 public void activate(final Message.ID id) {
160 assert id != null;
161
162 lock.lock();
163
164 try {
165 Registration reg = get(id);
166
167 reg.activate();
168
169 log.debug("Activated: {}", reg);
170 }
171 catch (NotRegisteredException e) {
172 // Sometimes we receive responses to requests faster than we can register them
173 log.debug("Ignoring activation; request not registered: {}", id);
174 }
175 finally {
176 lock.unlock();
177 }
178 }
179
180 public void deactivate(final Message.ID id) {
181 assert id != null;
182
183 lock.lock();
184
185 try {
186 Registration reg = get(id);
187
188 reg.deactivate();
189
190 log.debug("Deactivated: {}", reg);
191 }
192 catch (NotRegisteredException e) {
193 log.debug("Ignoring deactivation; request not registered: {}", id);
194 }
195 finally {
196 lock.unlock();
197 }
198 }
199
200 private void timeout(final Message.ID id) {
201 assert id != null;
202
203 lock.lock();
204
205 try {
206 Registration reg = remove(id);
207
208 reg.timeout();
209
210 log.debug("Timed out: {}", reg);
211 }
212 catch (NotRegisteredException e) {
213 log.debug("Ignoring timeout; request not registered: {}", id);
214 }
215 catch (TimeoutAbortedException e) {
216 log.debug("Timeout aborted: " + e.getMessage());
217 }
218 finally {
219 lock.unlock();
220 }
221 }
222
223 public void close() {
224 lock.lock();
225
226 try {
227 if (!registrations.isEmpty()) {
228 log.warn("Timing out remaining {} registrations", registrations.size());
229
230 for (Registration reg : registrations.values()) {
231 timeout(reg.request.getId());
232 }
233 }
234
235 //
236 // FIXME: This causes some problems when a rsh client closes, like:
237 //
238 // java.security.AccessControlException: access denied (java.lang.RuntimePermission modifyThread)
239 //
240 // scheduler.shutdown();
241 }
242 finally {
243 lock.unlock();
244 }
245 }
246
247 private enum RegistrationState
248 {
249 PENDING,
250 ACTIVE,
251 DEACTIVE,
252 TIMEDOUT
253 }
254
255 private class Registration
256 {
257 public final RequestHandle request;
258
259 public RegistrationState state = RegistrationState.PENDING;
260
261 private ScheduledFuture<?> timeoutFuture;
262
263 public Registration(final RequestHandle request) {
264 assert request != null;
265
266 this.request = request;
267 }
268
269 public void activate() {
270 if (state != RegistrationState.PENDING) {
271 log.debug("Can not activate, state is not PENDING, found: {}", state);
272 }
273 else {
274 Runnable task = new Runnable() {
275 public void run() {
276 RequestManager.this.timeout(request.getId());
277 }
278 };
279
280 Duration timeout = request.getTimeout();
281
282 log.debug("Scheduling timeout to trigger in: {}", timeout);
283
284 timeoutFuture = scheduler.schedule(task, timeout.getValue(), timeout.getUnit());
285
286 state = RegistrationState.ACTIVE;
287 }
288 }
289
290 public void deactivate() {
291 if (state != RegistrationState.ACTIVE) {
292 log.debug("Can not deactivate; state is not ACTIVE, found: {}", state);
293 }
294 else if (timeoutFuture.cancel(false)) {
295 timeoutFuture = null;
296
297 state = RegistrationState.DEACTIVE;
298 }
299 else {
300 log.warn("Unable to cancel registration timeout: {}", this);
301 }
302 }
303
304 public void timeout() {
305 Message.ID id = request.getId();
306
307 if (timeoutFuture.isCancelled()) {
308 throw new TimeoutAbortedException("Timeout has been canceled: " + id);
309 }
310 else if (request.isSignaled()) {
311 throw new TimeoutAbortedException("Request has been singled: " + id);
312 }
313 else {
314 request.timeout();
315
316 state = RegistrationState.TIMEDOUT;
317 }
318 }
319
320 public String toString() {
321 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
322 .append("id", request.getId())
323 .append("state", state)
324 .toString();
325 }
326 }
327
328 public class NotRegisteredException
329 extends RequestException
330 {
331 public NotRegisteredException(final Message.ID id) {
332 super(id);
333 }
334 }
335
336 public class DuplicateRegistrationException
337 extends RequestException
338 {
339 public DuplicateRegistrationException(final Message.ID id) {
340 super(id);
341 }
342 }
343
344 public class TimeoutAbortedException
345 extends RequestException
346 {
347 public TimeoutAbortedException(final String msg) {
348 super(msg);
349 }
350 }
351 }