View Javadoc
1 /* Reattore HTTP Server 2 3 Copyright (C) 2002 Michael Hope <michaelh@juju.net.nz> 4 5 This program is free software; you can redistribute it and/or modify 6 it under the terms of the GNU General Public License as published by 7 the Free Software Foundation; either version 2 of the License, or 8 (at your option) any later version. 9 10 This program is distributed in the hope that it will be useful, 11 but WITHOUT ANY WARRANTY; without even the implied warranty of 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 GNU General Public License for more details. 14 15 You should have received a copy of the GNU General Public License 16 along with this program; if not, write to the Free Software 17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 18 19 $Id: CombinedReactor.java,v 1.14 2003/03/05 04:31:56 michaelh Exp $ 20 */ 21 22 package juju.reattore.core.reactor.impl; 23 24 import java.net.*; 25 import java.io.*; 26 import java.util.*; 27 import java.nio.channels.*; 28 import java.nio.channels.spi.*; 29 30 import juju.reattore.core.reactor.*; 31 import juju.reattore.util.StatRegistry; 32 import org.apache.commons.logging.*; 33 34 /*** Base implementation of a combined server/client reactor 35 36 @group Reactor 37 @tag reactor 38 @children Server 39 */ 40 public class CombinedReactor 41 implements ServerSocketReactor, ClientSocketReactor { 42 43 private static Log log = LogFactory.getLog(CombinedReactor.class); 44 45 private Selector selector; 46 private boolean shouldStop; 47 private boolean shouldClose; 48 49 private List queue = new LinkedList(); 50 51 private int statsInterval = 0; 52 private long nextStats; 53 54 /*** Creates a new reactor. 55 56 @throws IOException if a selector cannot be opened (fatal) 57 */ 58 public CombinedReactor() 59 throws IOException { 60 61 selector = SelectorProvider.provider().openSelector(); 62 assert selector != null; 63 } 64 65 /*** Sets how often the statistics are dumped to the console. This 66 interval is not a guaranteed. 67 68 @param interval The time in ms, or <=0 to disable. 69 */ 70 public void setStatsInterval(int interval) { 71 statsInterval = interval; 72 } 73 74 private void dumpStats() { 75 if (statsInterval > 0) { 76 long now = System.currentTimeMillis(); 77 78 if (now > nextStats) { 79 log.info(StatRegistry.summarise()); 80 81 nextStats = now + statsInterval; 82 } 83 } 84 } 85 86 /*** Hands over execution to this reactor. The reactor will run 87 until #stop() is called. 88 89 @throws IOException if an unspecified (fatal) error occurs. 90 */ 91 public void go() 92 throws IOException { 93 94 while (shouldStop == false && shouldClose == false) { 95 addPending(); 96 if (selector.select() > 0) { 97 process(); 98 } 99 100 dumpStats(); 101 } 102 103 if (shouldClose) { 104 do { 105 addPending(); 106 poll(); 107 } while (selector.keys().isEmpty() == false); 108 } 109 } 110 111 /*** Causes this reactor to asynchronisally stop. Will not stop a 112 stuck reactor. 113 */ 114 public void stop() { 115 shouldStop = true; 116 selector.wakeup(); 117 } 118 119 /*** Causes this reactor to stop once all of the handlers are done. 120 May result in high CPU load. May not stop. 121 */ 122 public void close() { 123 shouldClose = true; 124 selector.wakeup(); 125 } 126 127 /*** Polls the list of sockets and processes any that have become active. 128 129 @throws IOException on a fatal error PENDING 130 */ 131 public void poll() 132 throws IOException { 133 134 addPending(); 135 if (selector.selectNow() > 0) { 136 process(); 137 } 138 } 139 140 private void addPending() { 141 /* PENDING: Being synchronized sucks */ 142 synchronized (queue) { 143 for (Iterator i = queue.iterator(); i.hasNext();) { 144 ((Runnable)i.next()).run(); 145 i.remove(); 146 } 147 } 148 } 149 150 private void process() 151 throws IOException { 152 153 Iterator ready = selector.selectedKeys().iterator(); 154 155 while (ready.hasNext()) { 156 SelectionKey key = (SelectionKey)ready.next(); 157 ready.remove(); 158 159 /* Server sockets should only accept, clients should only read or write. */ 160 assert key.isAcceptable() ^ (key.isReadable() | key.isWritable()); 161 162 if (key.isAcceptable()) { 163 ServerSocketHandler handler = (ServerSocketHandler)key.attachment(); 164 ServerSocketChannel ssch = (ServerSocketChannel)key.channel(); 165 SocketChannel ch = ssch.accept(); 166 167 handler.handleNewClient(ch); 168 } 169 else if (key.isConnectable()) { 170 ClientSocketHandler handler = (ClientSocketHandler)key.attachment(); 171 SocketChannel ch = (SocketChannel)key.channel(); 172 173 try { 174 boolean success = ch.finishConnect(); 175 176 if (success == false) { 177 /* In theory this can't happen. 178 PENDING */ 179 handler.handleError(); 180 } 181 else { 182 boolean finished = handler.handleConnected(); 183 184 if (finished) { 185 /* PENDING */ 186 assert false; 187 } 188 else { 189 key.interestOps(handler.getInterestOps()); 190 } 191 } 192 } 193 catch (IOException ex) { 194 try { 195 ch.close(); 196 } 197 catch (IOException ex2) { 198 log.info("Exception while closing", ex2); 199 } 200 201 handler.handleError(); 202 log.info("Exception while connecting", ex); 203 } 204 } 205 else if (key.isReadable() || key.isWritable()) { 206 ClientSocketHandler handler = (ClientSocketHandler)key.attachment(); 207 SocketChannel ch = (SocketChannel)key.channel(); 208 209 boolean finished = false; 210 211 try { 212 /* Could be both readable and writable at the same time */ 213 if (!finished && key.isReadable()) { 214 finished |= handler.handleReadable(); 215 } 216 if (!finished && key.isWritable()) { 217 finished |= handler.handleWritable(); 218 } 219 220 if (finished) { 221 /* Handler believes this channel is done. Close and remove. */ 222 /* The key should already be canceled, which 223 * removes it from the selector. */ 224 } 225 else { 226 key.interestOps(handler.getInterestOps()); 227 } 228 } 229 catch (IOException ex) { 230 try { 231 ch.close(); 232 } 233 catch (IOException ex2) { 234 log.info("Exception while closing", ex2); 235 } 236 237 handler.handleError(); 238 log.info("Exception while processing", ex); 239 /* PENDING */ 240 } 241 } 242 else { 243 /* Can't happen */ 244 assert false; 245 } 246 } 247 } 248 249 private void runLater(Runnable toRun) { 250 synchronized (queue) { 251 queue.add(toRun); 252 } 253 selector.wakeup(); 254 } 255 256 /*** @see ServerSocketReactor 257 */ 258 public void attach(ServerSocketChannel ch, ServerSocketHandler handler) 259 throws IOException { 260 261 assert ch != null; 262 assert handler != null; 263 264 /* We have a channel. Bind it into our selector. */ 265 SelectionKey key = ch.register(selector, handler.getInterestOps()); 266 key.attach(handler); 267 } 268 269 /*** @see ClientSocketReactor 270 */ 271 public void attach(SocketChannel ch, ClientSocketHandler handler) 272 throws IOException { 273 274 assert ch != null; 275 assert handler != null; 276 277 /* We have a channel. Bind it into our selector. */ 278 SelectionKey key = ch.register(selector, handler.getInterestOps()); 279 key.attach(handler); 280 } 281 282 /*** @see ClientSocketReactor 283 */ 284 public void connect(final SocketChannel ch, final SocketAddress addr, 285 final ClientSocketHandler handler) 286 throws IOException { 287 288 assert ch != null; 289 assert addr != null; 290 assert handler != null; 291 292 runLater(new Runnable() { 293 public void run() { 294 try { 295 SelectionKey key = ch.register(selector, handler.getInterestOps()); 296 key.attach(handler); 297 298 if (ch.connect(addr) == true) { 299 /* Connected immediatly */ 300 handler.handleConnected(); 301 /* May no longer be interested in connected events */ 302 key.interestOps(handler.getInterestOps()); 303 } 304 else { 305 /* Will connect later */ 306 } 307 } 308 catch (Exception ex) { 309 /* PENDING */ 310 } 311 }}); 312 } 313 }

This page was automatically generated by Maven