/[cvs]/nfo/perl/libs/POE/Component/LookupClient.pm
ViewVC logotype

Diff of /nfo/perl/libs/POE/Component/LookupClient.pm

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.1 by joko, Sun Jun 29 01:35:29 2003 UTC revision 1.6 by joko, Wed Dec 3 04:01:05 2003 UTC
# Line 2  Line 2 
2  ##  $Id$  ##  $Id$
3  ## ------------------------------------------------------------------------  ## ------------------------------------------------------------------------
4  ##  $Log$  ##  $Log$
5    ##  Revision 1.6  2003/12/03 04:01:05  joko
6    ##  somehow got this to re-connect transparently if server-side goes down
7    ##
8    ##  Revision 1.5  2003/07/01 18:13:15  joko
9    ##  fixed: shutdown and session-unaliasing seems to be done by gc now...?
10    ##
11    ##  Revision 1.3  2003/07/01 13:13:44  joko
12    ##  made "port" and "host" configurable from script
13    ##
14    ##  Revision 1.2  2003/07/01 13:05:01  joko
15    ##  major changes, tried to clean up shutdown phase - the watchdog-mech didn't work out well..... - what's about IKC's monitor? does it work on Linux?
16    ##
17  ##  Revision 1.1  2003/06/29 01:35:29  joko  ##  Revision 1.1  2003/06/29 01:35:29  joko
18  ##  initial commit  ##  initial commit
19  ##  ##
# Line 29  sub new { Line 41  sub new {
41    #my $event_handler = lookupd->new();    #my $event_handler = lookupd->new();
42    POE::Session->create(    POE::Session->create(
43      object_states => [      object_states => [
44        $self => [qw( _start _stop boot_intercom )]        $self => [qw( _start _stop boot_intercom start_session waste_time watchdog )]
45      ]      ]
46    );    );
47    
# Line 71  sub boot_intercom { Line 83  sub boot_intercom {
83    
84    # Client component - encapsulates some session(s) and/or wheel(s)?    # Client component - encapsulates some session(s) and/or wheel(s)?
85        
86    my $host = "localhost";    $self->{options}->{host} ||= "localhost";
87      $self->{options}->{port} ||= 30;
88      
89    #create_ikc_client( host => $host, port => 30, name => 'Client', on_connect => $self->{options}->{on_connect} );    #create_ikc_client( host => $host, port => 30, name => 'Client', on_connect => $self->{options}->{on_connect} );
90    create_ikc_client(    create_ikc_client(
91      host => $host,      ip => $self->{options}->{host},
92      port => 30,      port => $self->{options}->{port},
93      name => 'Client',      #name => 'Client',
94        #on_connect => sub { $self->build(); },
95      on_connect => sub { $self->build(); },      on_connect => sub { $self->build(); },
96      subscribe => [qw( poe://LookupService/ServiceRegistrar/ )],      #subscribe => [qw( poe://LookupService/ServiceRegistrar )],
97    );      #subscribe => [qw( poe://LookupService/ServiceRegistrar )],
98      );
99      
100      #$kernel->post( IKC => 'monitor', '*' => { register => 'start_session' });
101      #$kernel->post( IKC => 'monitor', 'LookupService' => { register => 'start_session' });
102      #$kernel->post( IKC => 'subscribe', [qw( poe://LookupService/ServiceRegistrar )], 'poe:start_session' );
103      
104      # start up the watchdog which monitors the required IKC intercom session
105      #$kernel->yield('waste_time');
106      $kernel->delay('watchdog', 2);
107    
108  };  };
109    
110    sub start_session {
111      #my $self = shift;
112      print STDERR "start_session", "\n";
113    }
114    
115  sub build {  sub build {
116    my $self = shift;    my $self = shift;
117    #print "BUILD", "\n";    #print "build", "\n";
118    # create sessions that depend on the foreign kernel.    # create sessions that depend on the foreign kernel.
119    POE::Component::LookupClient::Session->new();    POE::Component::LookupClient::Session->new();
120  }  }
121    
122    sub watchdog {
123      my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
124      $self->debug("watchdog");
125    
126      #$kernel->post( IKC => 'monitor', '*' => { register => 'start_session' });
127    
128      if (not defined $kernel->alias_resolve('DeviceClient')) {
129      #if (not defined $kernel->alias_resolve('IKC')) {
130        print STDERR "Session died, trying to restart!", "\n";
131        $kernel->yield('boot_intercom');
132        return;
133      }
134      
135      $kernel->delay('watchdog', 2);
136    };
137    
138    #------------------------------------------------------------------------------
139    # This event keeps this POE kernel alive.
140    # (stolen from POE::Component::IKC::Server, but not used 'til now...)
141    sub waste_time {
142        my($kernel, $heap)=@_[KERNEL, HEAP];
143        return if $heap->{'is a child'};
144    
145        unless($heap->{'been told we are parent'}) {
146            warn "$$: Telling everyone we are the parent\n";
147            $heap->{'been told we are parent'}=1;
148            $kernel->signal($kernel, '__parent');
149        }
150        if($heap->{'die'}) {
151            #DEBUG and warn "$$: Orderly shutdown\n";
152        } else {
153            $kernel->yield('watchdog');
154            $kernel->delay('waste_time', 60);
155        }
156        return;
157    }
158    
159    
160    
161    
162  package POE::Component::LookupClient::Session;  package POE::Component::LookupClient::Session;
# Line 112  sub new { Line 179  sub new {
179    
180    POE::Session->create(    POE::Session->create(
181      object_states => [      object_states => [
182        $self => [qw( _start _stop response register_lease renew_lease )]        $self => [qw( _start _stop on_response on_subscribe register_lease renew_lease remote_shutdown remote_timeout )]
183      ]      ]
184    );    );
185    
# Line 135  sub _start { Line 202  sub _start {
202    
203    #$kernel->alias_set('');    #$kernel->alias_set('');
204    #$kernel->post();    #$kernel->post();
205      
206      #sub POE::Component::IKC::Responder::DEBUG { 1 }
207      #sub POE::Component::IKC::Responder::Object::DEBUG { 1 }
208    
209    # set up communication channel for asynchronous responses    # set up communication channel for asynchronous responses
210    $kernel->alias_set('DeviceClient');    $kernel->alias_set('DeviceClient');
211    $kernel->post('IKC', 'publish', 'DeviceClient', [qw( response )]);    $kernel->post('IKC', 'publish', 'DeviceClient', [qw( on_response )]);
212      
213    # try to register on startup    #$kernel->post( IKC => 'subscribe', [qw( poe://LookupService/ServiceRegistrar )], 'poe:start_session' );
214    $kernel->yield('register_lease');  
215      $kernel->post( IKC => 'subscribe', [qw( poe://LookupService/ServiceRegistrar )], 'on_subscribe' );
216    
217      # try to register on startup?
218      #$kernel->yield('register_lease');
219        
220  };  };
221    
222  sub _stop {  sub _stop {
223    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
224    $self->debug("_stop");    $self->debug("_stop");
225    
226      # try to re-register if session dies?
227      #$kernel->yield('register_lease');
228      
229      #$kernel->alias_remove('DeviceClient');
230    
231      #$self = undef;
232  };  };
233    
234    
235    # Subscription receipt callback, see "perldoc POE::Component::IKC::Responder".
236    sub on_subscribe {
237      my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
238      $self->debug("on_subscribe");
239      # register lease on subscription
240      $kernel->yield('register_lease');
241    }
242    
243  # Main response dispatcher, this should dispatch to local states programmatically.  # Main response dispatcher, this should dispatch to local states programmatically.
244  sub response {  sub on_response {
245    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
246    #$self->debug("response");    #$self->debug("response");
247        
248      $heap->{'cancel timeout'} = 1;
249      
250    my $payload = $_[ARG0];    my $payload = $_[ARG0];
251    $payload ||= '';    $payload ||= '';
252        
253    # If registration succeeded, start the renewal cycle.    # If registration succeeded, start the renewal cycle.
254    if ($payload eq 'REG_OK') {    if ($payload eq 'REG_OK') {
255      $self->debug("Starting lease loop.");      $self->debug("Starting lease renewal loop.");
256        $kernel->post( IKC => 'monitor', '*' => { unregister => 'remote_shutdown' });
257        $kernel->post( IKC => 'monitor', '*' => { shutdown => 'remote_shutdown' });
258      $kernel->yield( 'renew_lease' );      $kernel->yield( 'renew_lease' );
259    
260    } elsif ($payload eq 'LEASE_OK') {    } elsif ($payload eq 'LEASE_OK') {
# Line 172  sub response { Line 266  sub response {
266    } else {    } else {
267      #print Dumper($payload);      #print Dumper($payload);
268      $heap->{'destroy lease'} = 1;      $heap->{'destroy lease'} = 1;
269        #$kernel->alarm_remove_all();
270        
271    }    }
272        
# Line 180  sub response { Line 275  sub response {
275  sub register_lease {  sub register_lease {
276    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
277    $self->debug("register_lease");    $self->debug("register_lease");
278      
279    my $ONE_arg = "Hello World!";    my $ONE_arg = "Hello World!";
280        
281    # V1 - without subscription    # V1 - without subscription
282    #$kernel->post('IKC', 'post', "poe://LookupService/ServiceRegistrar/register_lease", $ONE_arg);    #$kernel->post('IKC', 'post', "poe://LookupService/ServiceRegistrar/register_lease", $ONE_arg);
283    # V2 - with subscription    # V2 - with subscription
284    $kernel->post( "poe://LookupService/ServiceRegistrar", "register_lease", $ONE_arg);    $kernel->post( "poe://LookupService/ServiceRegistrar", "register_lease", $ONE_arg );
285  }  }
286    
287  sub renew_lease {  sub renew_lease {
288    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
289        
290    if ($heap->{'destroy lease'}) {    if ($heap->{'destroy lease'}) {
291      $heap->{'destroy lease'} = 0;      #$heap->{'destroy lease'} = 0;
292        #$kernel->delay_set('remote_timeout');
293      $self->debug("destroyed lease");      $self->debug("destroyed lease");
294        #undef $self;
295        #$kernel->alias_remove('DeviceClient');
296        #undef $_[SESSION];
297        #$kernel->post( 'IKC' => 'shutdown' );
298        # clear delayed posts
299        #$kernel->delay('renew_lease');
300        #$kernel->delay('remote_timeout');
301      return;      return;
302    }    }
303        
304    $self->debug("renew_lease");    $self->debug("renew_lease");
305      
306      # check if remote kernel(s) are still around
307      #$kernel->post('IKC', 'call', 'poe://LookupService/IKC/ping', '', 'poe:remote_timeout');
308      #$kernel->post('IKC', 'call', 'poe://LookupService/IKC/ping', undef, 'poe:remote_timeout');
309      #$kernel->post('poe://remote/IKC', 'ping', 'poe:remote_timeout');
310      #$kernel->delay('remote_timeout', 5);   # timeout
311    
312    my $ONE_arg = '';    my $ONE_arg = '';
313    #$kernel->post('IKC', 'post', "poe://LookupService/ServiceRegistrar/renew_lease", $ONE_arg);    #$kernel->post('IKC', 'post', "poe://LookupService/ServiceRegistrar/renew_lease", $ONE_arg);
314    # V1 - without subscription    # V1 - without subscription
315    $kernel->post('IKC', 'call', "poe://LookupService/ServiceRegistrar/renew_lease", $ONE_arg, 'poe:response');    $kernel->post('IKC', 'call', "poe://LookupService/ServiceRegistrar/renew_lease", $ONE_arg, 'poe:on_response');
316      
317    # V2 - with subscription    # V2 - with subscription
318    #my $resp = $kernel->call( "poe://LookupService/ServiceRegistrar", "renew_lease", $ONE_arg);    #my $resp = $kernel->call( "poe://LookupService/ServiceRegistrar", "renew_lease", $ONE_arg);
319    #print $resp, "\n";    #print $resp, "\n";
320      
321      # V3 - have we been able to post?
322      #my $resp = $kernel->call('IKC', 'call', "poe://LookupService/ServiceRegistrar/renew_lease", $ONE_arg, 'poe:response');
323      #print "resp: $resp", "\n";
324    
325    # and again...    # and again...
326    $kernel->delay('renew_lease', 15);    $kernel->delay('renew_lease', 5);
327      #$kernel->delay_set('renew_lease', 15);
328      #$kernel->delay_add('renew_lease', 15);
329    
330      # timeout!?
331      #$kernel->delay('remote_timeout', 20);
332      $kernel->delay_add('remote_timeout', 7);
333      #$kernel->delay_set('remote_timeout', 5);
334      #$kernel->delay_set('remote_timeout', 20);
335    
336    }
337    
338    sub remote_shutdown {
339      my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
340      $self->debug("remote_shutdown");
341    }
342    
343    sub remote_timeout {
344      
345      #my ($pong) = $_[ARG0];
346      #return if $pong;            # all is cool
347      
348      my ($self, $kernel, $heap) = @_[OBJECT, KERNEL, HEAP];
349    
350    #=pod
351      if ($heap->{'cancel timeout'}) {
352        $heap->{'cancel timeout'} = 0;
353        return;
354      }
355    #=cut
356    
357      $self->debug("remote_timeout");
358    
359      $heap->{'destroy lease'} = 1;
360    
361      # YOW!  Remote kernel timed out.  RUN AROUND SCREAMING!
362      print STDERR "# YOW!  Remote kernel timed out.  RUN AROUND SCREAMING!", "\n";
363    
364      # free all resources that keep this session running
365      $kernel->delay('renew_lease');
366      $kernel->delay('remote_timeout');
367      
368      #$kernel->post( 'IKC' => 'shutdown' );
369      
370      #$kernel->alias_remove('DeviceClient');
371    
372      #$kernel->alias_remove('DeviceClient');
373      #$kernel->yield('renew_lease');
374    
375      #sub POE::Component::IKC::Responder::DEBUG { return 1; }
376      #$kernel->post( 'IKC' => 'unregister' );
377      
378      #print Dumper($kernel->alias_list());
379    
380      $kernel->post( IKC => 'retract', 'DeviceClient' => [qw( on_response )]);
381      #$kernel->post( IKC => 'retract', 'me' => [qw( on_response )]);
382      $kernel->post( IKC => 'unsubscribe', [qw( poe://LookupService/ServiceRegistrar )]);
383      $kernel->post( IKC => 'unregister', [qw( poe://LookupService )]);
384      #$kernel->post( IKC => 'unsubscribe', [qw( poe://me )]);
385      #$kernel->post( IKC => 'unregister', [qw( poe://me )]);
386      #$kernel->run_one_timeslice();
387      #$kernel->run_one_timeslice();
388      #$kernel->run_one_timeslice();
389      #return;
390      
391      #$kernel->post( 'IKC' => 'shutdown' );
392      #$kernel->post( 'IKC' => 'blah' );
393      #$kernel->run_one_timeslice();
394      #$kernel->run_one_timeslice();
395    
396      $kernel->post('DeviceClient', 'stop');
397      
398      $kernel->alias_remove('DeviceClient');
399      $kernel->alias_remove('IKC');
400      
401      #$kernel->run_one_timeslice();
402      #$kernel->run_one_timeslice();
403      
404      #$kernel->yield('_start');
405    
406  }  }
407    
408  1;  1;

Legend:
Removed from v.1.1  
changed lines
  Added in v.1.6

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed