/[cvs]/joko/TestArea/perl/runtime/POE/net/server_preforked.pl
ViewVC logotype

Annotation of /joko/TestArea/perl/runtime/POE/net/server_preforked.pl

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (hide annotations)
Sun May 11 21:48:54 2003 UTC (21 years, 4 months ago) by joko
Branch: MAIN
CVS Tags: HEAD
File MIME type: text/plain
initial commit

1 joko 1.1 #!/usr/bin/perl -w
2     # $Id: preforkedserver.perl,v 1.12 2001/07/24 20:15:35 rcaputo Exp $
3    
4     # This is a proof of concept for pre-forking POE servers. It
5     # maintains pool of five servers (one master; four slave). At some
6     # point, it would be nice to make the server pool management a
7     # reusable wheel.
8    
9     use strict;
10     use lib '..';
11     use POE qw(Wheel::SocketFactory Wheel::ReadWrite Driver::SysRW Filter::Line);
12    
13     use Data::Dumper;
14    
15     ###############################################################################
16     # This is a pre-forked server's session object. It is given a handle
17     # from the server and processes transactions on it.
18    
19     package PreforkedSession;
20    
21     use strict;
22     use POE::Session;
23    
24     sub DEBUG { 1 }
25    
26     #------------------------------------------------------------------------------
27     # Create the preforked server session, and give it to POE to manage.
28    
29     sub new {
30     my ($type, $socket, $peer_addr, $peer_host) = @_;
31     my $self = bless { }, $type;
32    
33     POE::Session->new( $self,
34     [ qw( _start _stop command error flushed ) ],
35     # ARG0, ARG1, ARG2
36     [ $socket, $peer_addr, $peer_host ]
37     );
38     undef;
39     }
40    
41     #------------------------------------------------------------------------------
42     # This state accepts POE's standard _start event and starts processing
43     # I/O on the client socket
44    
45     sub _start {
46     my ($heap, $socket, $peer_addr, $peer_port) = @_[HEAP, ARG0, ARG1, ARG2];
47     # remember information for the logs
48     $heap->{addr} = $peer_addr;
49     $heap->{port} = $peer_port;
50     # become a reader/writer
51     $heap->{wheel} = POE::Wheel::ReadWrite->new
52     ( Handle => $socket, # on this socket
53     Driver => POE::Driver::SysRW->new, # using sysread and syswrite
54     Filter => POE::Filter::Line->new, # parsing I/O as lines
55     InputEvent => 'command', # generating this event on input
56     ErrorEvent => 'error', # generating this event on error
57     FlushedEvent => 'flushed' # generating this event on flush
58     );
59    
60     DEBUG &&
61     print "$$: handling connection from $heap->{addr} : $heap->{port}\n";
62     }
63    
64     #------------------------------------------------------------------------------
65     # Accept POE's standard _stop event, and log the close.
66    
67     sub _stop {
68     my $heap = $_[HEAP];
69     DEBUG && print "$$: session $heap->{addr} : $heap->{port} has stopped\n";
70     }
71    
72     #------------------------------------------------------------------------------
73     # This state is invoked by the ReadWrite wheel for each complete
74     # request it receives.
75    
76     sub command {
77     my ($heap, $input) = @_[HEAP, ARG0];
78     # just echo the input back
79     $heap->{wheel}->put("Echo: $input");
80     }
81    
82     #------------------------------------------------------------------------------
83     # This state is invoked when the ReadWrite wheel encounters an I/O
84     # error. It logs the error, and shuts down the session.
85    
86     sub error {
87     my ($heap, $operation, $errnum, $errstr) = @_[HEAP, ARG0, ARG1, ARG2];
88    
89     if ($errnum) {
90     warn( "$$: connection with $heap->{addr} : $heap->{port} encountered " .
91     "$operation error $errnum: $errstr\n"
92     );
93     }
94     # stop the session
95     delete $heap->{wheel};
96     }
97    
98     #------------------------------------------------------------------------------
99     # This state is invoked when the session's response has been flushed
100     # to the socket. Since the "protocol" specifies one transaction per
101     # socket, it shuts down the ReadWrite wheel, ending the session.
102    
103     sub flushed {
104     my $heap = $_[HEAP];
105     DEBUG && print "$$: response sent to $heap->{addr} : $heap->{port}\n";
106     delete $heap->{wheel};
107     }
108    
109     ###############################################################################
110     # This is a pre-forked server object. It creates a listening socket,
111     # then forks off many child processes to handle connections. This
112     # differs from the PCB pre-forking server example in that the parent
113     # process continues to accept requests.
114    
115     package PreforkedServer;
116    
117     use strict;
118     use Socket;
119     use POSIX qw(ECHILD EAGAIN);
120     use POE::Session;
121    
122     use Data::Dumper;
123    
124     sub DEBUG { 1 }
125    
126     #------------------------------------------------------------------------------
127     # Create the preforked server, and give it to POE to manage.
128    
129     sub new {
130     my ($type, $processes) = @_;
131     my $self = bless { }, $type;
132    
133     POE::Session->new( $self, [ qw(_start _stop fork retry signal connection) ],
134     # ARG0
135     [ $processes ]
136     );
137     undef;
138     }
139    
140     #------------------------------------------------------------------------------
141     # Accept POE's standard _start event, and start the server processes.
142    
143     sub _start {
144     my ($kernel, $heap, $processes) = @_[KERNEL, HEAP, ARG0];
145     # create a socket factory
146     $heap->{wheel} = POE::Wheel::SocketFactory->new
147     ( BindPort => 8888, # bind on this port
148     SuccessEvent => 'connection', # generate this event for connections
149     FailureEvent => 'error' # generate this event for errors
150     );
151     # watch for signals
152     $kernel->sig('CHLD', 'signal');
153     $kernel->sig('INT', 'signal');
154     # keep track of children
155     $heap->{children} = {};
156     $heap->{'failed forks'} = 0;
157     # change behavior for children
158     $heap->{'is a child'} = 0;
159     # fork the initial set of children
160     foreach (2..$processes) {
161     # yield() posts events to this session
162     $kernel->yield('fork');
163     }
164    
165     DEBUG && print "$$: master server has started\n";
166     }
167    
168     #------------------------------------------------------------------------------
169     # Accept POE's standard _stop event, and stop all the children, too.
170     # The 'children' hash is maintained in the 'fork' and 'signal'
171     # handlers. It's empty for children.
172    
173     sub _stop {
174     my $heap = $_[HEAP];
175     # kill the child servers
176     foreach (keys %{$heap->{children}}) {
177     DEBUG && print "$$: server is killing child $_ ...\n";
178     kill -1, $_;
179     }
180     DEBUG && print "$$: server is stopped\n";
181     }
182    
183     #------------------------------------------------------------------------------
184     # The server has been requested to fork, so fork already.
185    
186     sub fork {
187     my ($kernel, $heap) = @_[KERNEL, HEAP];
188     # children should not honor this event
189     return if ($heap->{'is a child'});
190     # try to fork
191     my $pid = fork();
192     # did the fork fail?
193     unless (defined($pid)) {
194     # try again later, if a temporary error
195     if (($! == EAGAIN) || ($! == ECHILD)) {
196     $heap->{'failed forks'}++;
197     $kernel->delay('retry', 1);
198     }
199     # fail permanently, if fatal
200     else {
201     warn "Can't fork: $!\n";
202     $kernel->yield('_stop');
203     }
204     return;
205     }
206     # successful fork; parent keeps track
207     if ($pid) {
208     $heap->{children}->{$pid} = 1;
209     DEBUG &&
210     print( "$$: master server forked a new child. children: (",
211     join(' ', keys %{$heap->{children}}), ")\n"
212     );
213     }
214     # child becomes a child server
215     else {
216     $heap->{'is a child'} = 1; # don't allow fork
217     $heap->{children} = { }; # don't kill child processes
218     $heap->{connections} = 0; # limit sessions, then die off
219    
220     DEBUG && print "$$: child server has been forked\n";
221     }
222     }
223    
224     #------------------------------------------------------------------------------
225     # Retry failed forks. This is invoked (after a brief delay) if the
226     # 'fork' state encountered a temporary error.
227    
228     sub retry {
229     my ($kernel, $heap) = @_[KERNEL, HEAP];
230    
231     # Multiplex the delayed 'retry' event into enough 'fork' events to
232     # make up for the temporary fork errors.
233    
234     for (1 .. $heap->{'failed forks'}) {
235     $kernel->yield('fork');
236     }
237     # reset the failed forks counter
238     $heap->{'failed forks'} = 0;
239     }
240    
241     #------------------------------------------------------------------------------
242     # Process signals. SIGCHLD causes this session to fork off a
243     # replacement for the lost child. Terminal signals aren't handled, so
244     # the session will stop on SIGINT. The _stop event handler takes care
245     # of cleanup.
246    
247     sub signal {
248     my ($kernel, $heap, $signal, $pid, $status) =
249     @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
250    
251     # Some operating systems call this SIGCLD. POE's kernel translates
252     # CLD to CHLD, so developers only need to check for the one version.
253    
254     if ($signal eq 'CHLD') {
255     # if it was one of ours; fork another
256     if (delete $heap->{children}->{$pid}) {
257     DEBUG &&
258     print( "$$: master caught SIGCHLD. children: (",
259     join(' ', keys %{$heap->{children}}), ")\n"
260     );
261     $kernel->yield('fork');
262     }
263     }
264     # don't handle terminal signals
265     return 0;
266     }
267    
268     #------------------------------------------------------------------------------
269     # This state is invoked when the SocketFactory wheel hears a
270     # connection. It creates a new session to handle the connection.
271    
272     sub connection {
273     my ($kernel, $heap, $socket, $peer_addr, $peer_port) =
274     @_[KERNEL, HEAP, ARG0, ARG1, ARG2];
275    
276     $peer_addr = inet_ntoa($peer_addr);
277     DEBUG &&
278     print "$$: server received a connection from $peer_addr : $peer_port\n";
279    
280     PreforkedSession->new($socket, $peer_addr, $peer_port);
281    
282     # Stop child sessions after a certain number of connections have
283     # been handled. This enables the program to test SIGCHLD handling
284     # and re-forking.
285    
286     #print "heap: ", $heap, "\n";
287     print Dumper($heap);
288     print "conns: ", $heap->{connections}, "\n";
289    
290     if ($heap->{'is a child'}) {
291     #print "is a child!", "\n";
292     #print "conns: ", $heap->{connections}, "\n";
293     if (++$heap->{connections} >= 1) {
294     print "delete wheel", "\n";
295     delete $heap->{wheel};
296     }
297     }
298     }
299    
300     ###############################################################################
301     # Start a pre-forked server, with a pool of 5 processes, and run them
302     # until it's time to exit.
303    
304     package main;
305    
306     #PreforkedServer->new(5);
307     #PreforkedServer->new(2);
308     PreforkedServer->new(3);
309    
310     print "*** If all goes well, there should be an echo server on port 8888.\n";
311    
312     $poe_kernel->run();
313    
314     exit;

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed