/[cvs]/joko/TestArea/perl/runtime/POE/net/server_preforked.pl
ViewVC logotype

Contents of /joko/TestArea/perl/runtime/POE/net/server_preforked.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (show annotations)
Sun May 11 21:48:54 2003 UTC (21 years, 4 months ago) by joko
Branch: MAIN
CVS Tags: HEAD
File MIME type: text/plain
initial commit

1 #!/usr/bin/perl -w
2 # $Id: preforkedserver.perl,v 1.12 2001/07/24 20:15:35 rcaputo Exp $
3
4 # This is a proof of concept for pre-forking POE servers. It
5 # maintains pool of five servers (one master; four slave). At some
6 # point, it would be nice to make the server pool management a
7 # reusable wheel.
8
9 use strict;
10 use lib '..';
11 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Driver::SysRW Filter::Line);
12
13 use Data::Dumper;
14
15 ###############################################################################
16 # This is a pre-forked server's session object. It is given a handle
17 # from the server and processes transactions on it.
18
19 package PreforkedSession;
20
21 use strict;
22 use POE::Session;
23
24 sub DEBUG { 1 }
25
26 #------------------------------------------------------------------------------
27 # Create the preforked server session, and give it to POE to manage.
28
29 sub new {
30 my ($type, $socket, $peer_addr, $peer_host) = @_;
31 my $self = bless { }, $type;
32
33 POE::Session->new( $self,
34 [ qw( _start _stop command error flushed ) ],
35 # ARG0, ARG1, ARG2
36 [ $socket, $peer_addr, $peer_host ]
37 );
38 undef;
39 }
40
41 #------------------------------------------------------------------------------
42 # This state accepts POE's standard _start event and starts processing
43 # I/O on the client socket
44
45 sub _start {
46 my ($heap, $socket, $peer_addr, $peer_port) = @_[HEAP, ARG0, ARG1, ARG2];
47 # remember information for the logs
48 $heap->{addr} = $peer_addr;
49 $heap->{port} = $peer_port;
50 # become a reader/writer
51 $heap->{wheel} = POE::Wheel::ReadWrite->new
52 ( Handle => $socket, # on this socket
53 Driver => POE::Driver::SysRW->new, # using sysread and syswrite
54 Filter => POE::Filter::Line->new, # parsing I/O as lines
55 InputEvent => 'command', # generating this event on input
56 ErrorEvent => 'error', # generating this event on error
57 FlushedEvent => 'flushed' # generating this event on flush
58 );
59
60 DEBUG &&
61 print "$$: handling connection from $heap->{addr} : $heap->{port}\n";
62 }
63
64 #------------------------------------------------------------------------------
65 # Accept POE's standard _stop event, and log the close.
66
67 sub _stop {
68 my $heap = $_[HEAP];
69 DEBUG && print "$$: session $heap->{addr} : $heap->{port} has stopped\n";
70 }
71
72 #------------------------------------------------------------------------------
73 # This state is invoked by the ReadWrite wheel for each complete
74 # request it receives.
75
76 sub command {
77 my ($heap, $input) = @_[HEAP, ARG0];
78 # just echo the input back
79 $heap->{wheel}->put("Echo: $input");
80 }
81
82 #------------------------------------------------------------------------------
83 # This state is invoked when the ReadWrite wheel encounters an I/O
84 # error. It logs the error, and shuts down the session.
85
86 sub error {
87 my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
88
89 if ($errnum) {
90 warn( "$$: connection with $heap->{addr} : $heap->{port} encountered " .
91 "$operation error $errnum: $errstr\n"
92 );
93 }
94 # stop the session
95 delete $heap->{wheel};
96 }
97
98 #------------------------------------------------------------------------------
99 # This state is invoked when the session's response has been flushed
100 # to the socket. Since the "protocol" specifies one transaction per
101 # socket, it shuts down the ReadWrite wheel, ending the session.
102
103 sub flushed {
104 my $heap = $_[HEAP];
105 DEBUG && print "$$: response sent to $heap->{addr} : $heap->{port}\n";
106 delete $heap->{wheel};
107 }
108
109 ###############################################################################
110 # This is a pre-forked server object. It creates a listening socket,
111 # then forks off many child processes to handle connections. This
112 # differs from the PCB pre-forking server example in that the parent
113 # process continues to accept requests.
114
115 package PreforkedServer;
116
117 use strict;
118 use Socket;
119 use POSIX qw(ECHILD EAGAIN);
120 use POE::Session;
121
122 use Data::Dumper;
123
124 sub DEBUG { 1 }
125
126 #------------------------------------------------------------------------------
127 # Create the preforked server, and give it to POE to manage.
128
129 sub new {
130 my ($type, $processes) = @_;
131 my $self = bless { }, $type;
132
133 POE::Session->new( $self, [ qw(_start _stop fork retry signal connection) ],
134 # ARG0
135 [ $processes ]
136 );
137 undef;
138 }
139
140 #------------------------------------------------------------------------------
141 # Accept POE's standard _start event, and start the server processes.
142
143 sub _start {
144 my ($kernel, $heap, $processes) = @_[KERNEL, HEAP, ARG0];
145 # create a socket factory
146 $heap->{wheel} = POE::Wheel::SocketFactory->new
147 ( BindPort => 8888, # bind on this port
148 SuccessEvent => 'connection', # generate this event for connections
149 FailureEvent => 'error' # generate this event for errors
150 );
151 # watch for signals
152 $kernel->sig('CHLD', 'signal');
153 $kernel->sig('INT', 'signal');
154 # keep track of children
155 $heap->{children} = {};
156 $heap->{'failed forks'} = 0;
157 # change behavior for children
158 $heap->{'is a child'} = 0;
159 # fork the initial set of children
160 foreach (2..$processes) {
161 # yield() posts events to this session
162 $kernel->yield('fork');
163 }
164
165 DEBUG && print "$$: master server has started\n";
166 }
167
168 #------------------------------------------------------------------------------
169 # Accept POE's standard _stop event, and stop all the children, too.
170 # The 'children' hash is maintained in the 'fork' and 'signal'
171 # handlers. It's empty for children.
172
173 sub _stop {
174 my $heap = $_[HEAP];
175 # kill the child servers
176 foreach (keys %{$heap->{children}}) {
177 DEBUG && print "$$: server is killing child $_ ...\n";
178 kill -1, $_;
179 }
180 DEBUG && print "$$: server is stopped\n";
181 }
182
183 #------------------------------------------------------------------------------
184 # The server has been requested to fork, so fork already.
185
186 sub fork {
187 my ($kernel, $heap) = @_[KERNEL, HEAP];
188 # children should not honor this event
189 return if ($heap->{'is a child'});
190 # try to fork
191 my $pid = fork();
192 # did the fork fail?
193 unless (defined($pid)) {
194 # try again later, if a temporary error
195 if (($! == EAGAIN) || ($! == ECHILD)) {
196 $heap->{'failed forks'}++;
197 $kernel->delay('retry', 1);
198 }
199 # fail permanently, if fatal
200 else {
201 warn "Can't fork: $!\n";
202 $kernel->yield('_stop');
203 }
204 return;
205 }
206 # successful fork; parent keeps track
207 if ($pid) {
208 $heap->{children}->{$pid} = 1;
209 DEBUG &&
210 print( "$$: master server forked a new child. children: (",
211 join(' ', keys %{$heap->{children}}), ")\n"
212 );
213 }
214 # child becomes a child server
215 else {
216 $heap->{'is a child'} = 1; # don't allow fork
217 $heap->{children} = { }; # don't kill child processes
218 $heap->{connections} = 0; # limit sessions, then die off
219
220 DEBUG && print "$$: child server has been forked\n";
221 }
222 }
223
224 #------------------------------------------------------------------------------
225 # Retry failed forks. This is invoked (after a brief delay) if the
226 # 'fork' state encountered a temporary error.
227
228 sub retry {
229 my ($kernel, $heap) = @_[KERNEL, HEAP];
230
231 # Multiplex the delayed 'retry' event into enough 'fork' events to
232 # make up for the temporary fork errors.
233
234 for (1 .. $heap->{'failed forks'}) {
235 $kernel->yield('fork');
236 }
237 # reset the failed forks counter
238 $heap->{'failed forks'} = 0;
239 }
240
241 #------------------------------------------------------------------------------
242 # Process signals. SIGCHLD causes this session to fork off a
243 # replacement for the lost child. Terminal signals aren't handled, so
244 # the session will stop on SIGINT. The _stop event handler takes care
245 # of cleanup.
246
247 sub signal {
248 my ($kernel, $heap, $signal, $pid, $status) =
249 @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
250
251 # Some operating systems call this SIGCLD. POE's kernel translates
252 # CLD to CHLD, so developers only need to check for the one version.
253
254 if ($signal eq 'CHLD') {
255 # if it was one of ours; fork another
256 if (delete $heap->{children}->{$pid}) {
257 DEBUG &&
258 print( "$$: master caught SIGCHLD. children: (",
259 join(' ', keys %{$heap->{children}}), ")\n"
260 );
261 $kernel->yield('fork');
262 }
263 }
264 # don't handle terminal signals
265 return 0;
266 }
267
268 #------------------------------------------------------------------------------
269 # This state is invoked when the SocketFactory wheel hears a
270 # connection. It creates a new session to handle the connection.
271
272 sub connection {
273 my ($kernel, $heap, $socket, $peer_addr, $peer_port) =
274 @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
275
276 $peer_addr = inet_ntoa($peer_addr);
277 DEBUG &&
278 print "$$: server received a connection from $peer_addr : $peer_port\n";
279
280 PreforkedSession->new($socket, $peer_addr, $peer_port);
281
282 # Stop child sessions after a certain number of connections have
283 # been handled. This enables the program to test SIGCHLD handling
284 # and re-forking.
285
286 #print "heap: ", $heap, "\n";
287 print Dumper($heap);
288 print "conns: ", $heap->{connections}, "\n";
289
290 if ($heap->{'is a child'}) {
291 #print "is a child!", "\n";
292 #print "conns: ", $heap->{connections}, "\n";
293 if (++$heap->{connections} >= 1) {
294 print "delete wheel", "\n";
295 delete $heap->{wheel};
296 }
297 }
298 }
299
300 ###############################################################################
301 # Start a pre-forked server, with a pool of 5 processes, and run them
302 # until it's time to exit.
303
304 package main;
305
306 #PreforkedServer->new(5);
307 #PreforkedServer->new(2);
308 PreforkedServer->new(3);
309
310 print "*** If all goes well, there should be an echo server on port 8888.\n";
311
312 $poe_kernel->run();
313
314 exit;

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed