/[cvs]/nfo/perl/libs/Data/Transfer/Sync/Core.pm
ViewVC logotype

Annotation of /nfo/perl/libs/Data/Transfer/Sync/Core.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.8 - (hide annotations)
Thu Mar 27 15:31:15 2003 UTC (21 years, 3 months ago) by joko
Branch: MAIN
Changes since 1.7: +6 -2 lines
fixes to modules regarding new namespace(s) below Data::Mungle::*

1 joko 1.8 ## $Id: Core.pm,v 1.7 2003/02/21 08:01:11 joko Exp $
2 joko 1.1 ##
3     ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
4     ##
5     ## See COPYRIGHT section in pod text below for usage and distribution rights.
6     ##
7     ## ----------------------------------------------------------------------------------------
8 joko 1.2 ## $Log: Core.pm,v $
9 joko 1.8 ## Revision 1.7 2003/02/21 08:01:11 joko
10     ## debugging, logging
11     ## renamed module
12     ##
13 joko 1.7 ## Revision 1.6 2003/02/14 14:03:49 joko
14     ## + logging, debugging
15     ## - refactored code to sister module
16     ##
17 joko 1.6 ## Revision 1.5 2003/02/11 05:30:47 joko
18     ## + minor fixes and some debugging mud
19     ##
20 joko 1.5 ## Revision 1.4 2003/02/09 05:01:10 joko
21     ## + major structure changes
22     ## - refactored code to sister modules
23     ##
24 joko 1.4 ## Revision 1.3 2003/01/20 17:01:14 joko
25     ## + cosmetics and debugging
26     ## + probably refactored code to new plugin-modules 'Metadata.pm' and/or 'StorageInterface.pm' (guess it was the last one...)
27     ##
28 joko 1.3 ## Revision 1.2 2003/01/19 02:05:42 joko
29     ## - removed pod-documentation: now in Data/Transfer/Sync.pod
30     ##
31 joko 1.2 ## Revision 1.1 2003/01/19 01:23:04 joko
32     ## + new from Data/Transfer/Sync.pm
33     ##
34 joko 1.1 ## Revision 1.11 2002/12/23 07:10:59 joko
35     ## + using MD5 for checksum generation again - the 32-bit integer hash from DBI seems to be too lazy
36     ##
37     ## Revision 1.10 2002/12/19 01:07:16 joko
38     ## + fixed output done via $logger
39     ##
40     ## Revision 1.9 2002/12/16 07:02:34 jonen
41     ## + added comment
42     ##
43     ## Revision 1.8 2002/12/15 02:03:09 joko
44     ## + fixed logging-messages
45     ## + additional metadata-checks
46     ##
47     ## Revision 1.7 2002/12/13 21:49:34 joko
48     ## + sub configure
49     ## + sub checkOptions
50     ##
51     ## Revision 1.6 2002/12/06 04:49:10 jonen
52     ## + disabled output-puffer here
53     ##
54     ## Revision 1.5 2002/12/05 08:06:05 joko
55     ## + bugfix with determining empty fields (Null) with DBD::CSV
56     ## + debugging
57     ## + updated comments
58     ##
59     ## Revision 1.4 2002/12/03 15:54:07 joko
60     ## + {import}-flag is now {prepare}-flag
61     ##
62     ## Revision 1.3 2002/12/01 22:26:59 joko
63     ## + minor cosmetics for logging
64     ##
65     ## Revision 1.2 2002/12/01 04:43:25 joko
66     ## + mapping deatil entries may now be either an ARRAY or a HASH
67     ## + erase flag is used now (for export-operations)
68     ## + expressions to refer to values inside deep nested structures
69     ## - removed old mappingV2-code
70     ## + cosmetics
71     ## + sub _erase_all
72     ##
73     ## Revision 1.1 2002/11/29 04:45:50 joko
74     ## + initial check in
75     ##
76     ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
77     ## + new
78     ## ----------------------------------------------------------------------------------------
79    
80    
81     package Data::Transfer::Sync::Core;
82    
83     use strict;
84     use warnings;
85    
86     use mixin::with qw( Data::Transfer::Sync );
87    
88    
89     # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - main
90    
91     use Data::Dumper;
92    
93 joko 1.7 #use misc::HashExt;
94     use Hash::Serializer;
95 joko 1.8 use Data::Mungle::Compare::Struct qw( getDifference isEmpty );
96 joko 1.1 use Data::Storage::Container;
97     use DesignPattern::Object;
98 joko 1.5 use libdb qw( quotesql );
99 joko 1.1
100     # get logger instance
101     my $logger = Log::Dispatch::Config->instance;
102    
103     $| = 1;
104    
105     =pod
106     sub new {
107     my $invocant = shift;
108     my $class = ref($invocant) || $invocant;
109     my $self = {};
110     $logger->debug( __PACKAGE__ . "->new(@_)" );
111     bless $self, $class;
112     $self->configure(@_);
113     return $self;
114     }
115     =cut
116    
117    
118     sub _init {
119     my $self = shift;
120    
121     # build new container if necessary
122     $self->{container} = Data::Storage::Container->new() if !$self->{container};
123    
124     # add storages to container (optional)
125     foreach (keys %{$self->{storages}}) {
126     $self->{container}->addStorage($_, $self->{storages}->{$_});
127     }
128    
129 joko 1.5 # trace
130     #print Dumper($self);
131     #exit;
132    
133 joko 1.1 return 1;
134    
135     }
136    
137     sub _initV1 {
138     my $self = shift;
139 joko 1.4 $logger->debug( __PACKAGE__ . "->_initV1" );
140     die("this should not be reached!");
141 joko 1.1 # tag storages with id-authority and checksum-provider information
142     # TODO: better store tag inside metadata to hold bits together!
143     map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
144     map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
145     map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
146     }
147    
148    
149     # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="ยง/%???
150 joko 1.4 sub _run {
151 joko 1.1
152     my $self = shift;
153    
154 joko 1.4 #print "verbose: ", $self->{verbose}, "\n";
155     $self->{verbose} = 1;
156    
157     $logger->debug( __PACKAGE__ . "->_run" );
158    
159     # for statistics
160 joko 1.7 my $tc = Hash::Serializer->new( {} );
161 joko 1.1 my $results;
162    
163     # set of objects is already in $self->{args}
164     # TODO: make independent of the terminology "object..."
165     $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
166    
167     # apply filter
168     if (my $filter = $self->{meta}->{source}->{filter}) {
169     #print Dumper($filter);
170     #exit;
171     $results ||= $self->_getNodeList('source', $filter);
172     }
173    
174     # get reference to node list from convenient method provided by CORE-HANDLE
175     $results ||= $self->_getNodeList('source');
176    
177     # checkpoint: do we actually have a list to iterate through?
178     if (!$results || !@{$results}) {
179 joko 1.4 $logger->notice( __PACKAGE__ . "->_run: No nodes to synchronize." );
180 joko 1.1 return;
181     }
182 joko 1.4
183    
184    
185     # check if we actually *have* a synchronization method
186     if (!$self->{options}->{metadata}->{syncMethod}) {
187     $logger->critical( __PACKAGE__ . "->_run: No synchronization method (checksum|timestamp) specified" );
188     return;
189     }
190    
191 joko 1.1
192     # dereference
193     my @results = @{$results};
194    
195     # iterate through set
196     foreach my $source_node_real (@results) {
197    
198 joko 1.5 print ":" if $self->{verbose};
199    
200 joko 1.1 $tc->{total}++;
201    
202     #print "======================== iter", "\n";
203    
204     # clone object (in case we have to modify it here)
205     # TODO:
206     # - is a "deep_copy" needed here if occouring modifications take place?
207     # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
208     # - after all, just take care for now that this object doesn't get updated!
209 joko 1.5 # - so, just use its reference for now - if some cloning is needed in future, do this here!
210 joko 1.1 my $source_node = $source_node_real;
211    
212     # modify entry - handle new style callbacks (the readers)
213 joko 1.6
214     # trace
215     #print Dumper($source_node);
216     #exit;
217 joko 1.1
218     my $descent = 'source';
219    
220     # handle callbacks right now while scanning them (asymmetric to the writers)
221     my $map_callbacks = {};
222     if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
223    
224 joko 1.5 # trace
225     #print Dumper($callbacks);
226     #exit;
227    
228 joko 1.1 my $error = 0;
229    
230     foreach my $node (keys %{$callbacks->{read}}) {
231    
232     my $object = $source_node;
233     my $value; # = $source_node->{$node};
234    
235 joko 1.6 # trace
236     #print Dumper($self->{options});
237 joko 1.5
238 joko 1.1 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
239 joko 1.3 #my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
240     my $perl_callback = $self->{meta}->{$descent}->{nodeType} . '::' . $node . '_read';
241 joko 1.1 my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
242     #print $evalstring, "\n"; exit;
243     my $cb_result = eval($evalstring);
244     if ($@) {
245 joko 1.6 $error = 1;
246 joko 1.5 $logger->error( __PACKAGE__ . "->_run: $@" );
247 joko 1.6 next;
248 joko 1.1 }
249     # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
250    
251     $source_node->{$node} = $cb_result;
252    
253     }
254    
255     }
256    
257 joko 1.5 # trace
258     #print Dumper($source_node);
259 joko 1.1
260     # exclude defined fields (simply delete from object)
261     map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
262    
263     # here we accumulate information about the status of the current node (payload/row/object/item/entry)
264     $self->{node} = {};
265     $self->{node}->{source}->{payload} = $source_node;
266    
267 joko 1.5 # trace
268     #print Dumper($self->{node});
269     #exit;
270 joko 1.1
271     # determine ident of entry
272     my $identOK = $self->_resolveNodeIdent('source');
273     #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
274     if (!$identOK) {
275     #print Dumper($self->{meta}->{source});
276 joko 1.4 $logger->critical( __PACKAGE__ . "->_run: No ident found in source node ( nodeName='$self->{meta}->{source}->{nodeName}', nodeType='$self->{meta}->{source}->{nodeType}') try to \"prepare\" this node first?" );
277 joko 1.1 return;
278     }
279    
280 joko 1.4 #print "l" if $self->{verbose};
281 joko 1.1 my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
282    
283     # mark node as new either if there's no ident or if stat/load failed
284     if (!$statOK) {
285     $self->{node}->{status}->{new} = 1;
286     print "n" if $self->{verbose};
287     }
288    
289     #print "checksum", "\n";
290    
291 joko 1.4 #print Dumper($self);
292    
293 joko 1.1 # determine status of entry by synchronization method
294 joko 1.4 if ( lc $self->{options}->{metadata}->{syncMethod} eq 'checksum' ) {
295 joko 1.1 #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
296     #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
297    
298 joko 1.4 # calculate checksum of source node
299 joko 1.1 #$self->_calcChecksum('source');
300     if (!$self->_readChecksum('source')) {
301 joko 1.4 $logger->warning( __PACKAGE__ . "->_run: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
302 joko 1.1 $tc->{skip}++;
303     print "s" if $self->{verbose};
304     next;
305     }
306    
307     # get checksum from synchronization target
308     $self->_readChecksum('target');
309     #if (!$self->_readChecksum('target')) {
310     # $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
311     # next;
312     #}
313    
314     # pre flight check: do we actually have a checksum provided?
315     #if (!$self->{node}->{source}->{checksum}) {
316     # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
317     # return;
318     #}
319    
320     # determine if entry is "new" or "dirty"
321     # after all, this seems to be the point where the hammer falls.....
322     print "c" if $self->{verbose};
323 joko 1.5
324     # trace
325     #print Dumper($self->{node});
326     #exit;
327    
328 joko 1.1 $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
329     if (!$self->{node}->{status}->{new}) {
330     $self->{node}->{status}->{dirty} =
331     $self->{node}->{status}->{new} ||
332     (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
333     ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
334     $self->{args}->{force};
335     }
336    
337 joko 1.4 } else {
338     $logger->warning( __PACKAGE__ . "->_run: Synchronization method '$self->{options}->{metadata}->{syncMethod}' is not implemented" );
339     $tc->{skip}++;
340     print "s" if $self->{verbose};
341     next;
342    
343 joko 1.1 }
344    
345     # first reaction on entry-status: continue with next entry if the current is already "in sync"
346     if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
347     $tc->{in_sync}++;
348     next;
349     }
350    
351     # build map to actually transfer the data from source to target
352 joko 1.4 $self->buildAttributeMap();
353 joko 1.1
354    
355 joko 1.5 # trace
356     #print Dumper($self->{node}); exit;
357     #print "attempt", "\n";
358 joko 1.1
359     # additional (new) checks for feature "write-protection"
360     if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
361     $tc->{attempt_transfer}++;
362     print "\n" if $self->{verbose};
363 joko 1.4 $logger->notice( __PACKAGE__ . "->_run: Target is write-protected. Will not insert or modify node. " .
364 joko 1.1 "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
365     print "\n" if $self->{verbose};
366     $tc->{skip}++;
367     next;
368     }
369    
370 joko 1.5 # trace
371     #print Dumper($self);
372     #exit;
373 joko 1.4
374 joko 1.1 # transfer contents of map to target
375     if ($self->{node}->{status}->{new}) {
376     $tc->{attempt_new}++;
377     $self->_doTransferToTarget('insert');
378     # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
379     #print Dumper($self->{node});
380     $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
381     $self->_readChecksum('target');
382    
383     } elsif ($self->{node}->{status}->{dirty}) {
384     $tc->{attempt_modify}++;
385     # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
386     $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
387     $self->_doTransferToTarget('update');
388     $self->_readChecksum('target');
389     }
390    
391     if ($self->{node}->{status}->{ok}) {
392     $tc->{ok}++;
393     print "t" if $self->{verbose};
394     }
395    
396     if ($self->{node}->{status}->{error}) {
397     $tc->{error}++;
398     push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
399     print "e" if $self->{verbose};
400     }
401    
402 joko 1.4 # trace
403     #print Dumper($self);
404     #exit;
405    
406 joko 1.1 # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
407     # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
408 joko 1.5 #if ($self->{node}->{status}->{ok} && $self->{options}->{target}->{storage}->{idAuthority}) {
409     if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{isIdentAuthority}) {
410 joko 1.1 print "r" if $self->{verbose};
411     #print Dumper($self->{meta});
412     #print Dumper($self->{node});
413     #exit;
414     $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
415     }
416    
417     }
418    
419     print "\n" if $self->{verbose};
420    
421     # build user-message from some stats
422     my $msg = "statistics: $tc";
423    
424     if ($tc->{error_per_row}) {
425     $msg .= "\n";
426     $msg .= "errors from \"error_per_row\":" . "\n";
427     $msg .= Dumper($tc->{error_per_row});
428     }
429    
430     # todo!!!
431     #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
432 joko 1.7 #$logger->info( __PACKAGE__ . "->_run: $msg" );
433     $logger->info($msg . "\n");
434 joko 1.1
435     return $tc;
436    
437     }
438    
439    
440     # refactor this as some core-function to do a generic dump resolving data-encapsulations of e.g. Set::Object
441     sub _dumpCompact {
442     my $self = shift;
443    
444     #my $vars = \@_;
445     my @data = ();
446    
447     my $count = 0;
448     foreach (@_) {
449     my $item = {};
450     foreach my $key (keys %$_) {
451     my $val = $_->{$key};
452    
453     #print Dumper($val);
454    
455     if (ref $val eq 'Set::Object') {
456     #print "========================= SET", "\n";
457     #print Dumper($val);
458     #print Dumper($val->members());
459     #$val = $val->members();
460     #$vars->[$count]->{$key} = $val->members() if $val->can("members");
461     #$item->{$key} = $val->members() if $val->can("members");
462     $item->{$key} = $val->members();
463     #print Dumper($vars->[$count]->{$key});
464    
465     } else {
466     $item->{$key} = $val;
467     }
468    
469     }
470     push @data, $item;
471     $count++;
472     }
473    
474     #print "Dump:", Dumper(@data), "\n";
475    
476     $Data::Dumper::Indent = 0;
477     my $result = Dumper(@data);
478     $Data::Dumper::Indent = 2;
479     return $result;
480    
481     }
482    
483    
484    
485     sub _doTransferToTarget {
486     my $self = shift;
487     my $action = shift;
488 joko 1.5
489     # trace
490     #print Dumper($self->{meta});
491     #print Dumper($self->{node});
492     #exit;
493    
494 joko 1.1 $self->_modifyNode('target', $action, $self->{node}->{map});
495     }
496 joko 1.4
497 joko 1.1
498     sub _doModifySource_IdentChecksum {
499     my $self = shift;
500     my $ident_new = shift;
501     # this changes an old node to a new one including ident and checksum
502     # TODO:
503     # - eventually introduce an external resource to store this data to
504     # - we won't have to "re"-modify the source node here
505     my $map = {
506     $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
507     cs => $self->{node}->{target}->{checksum},
508     };
509    
510     #print Dumper($map);
511     #print Dumper($self->{node});
512     #exit;
513    
514     $self->_modifyNode('source', 'update', $map);
515     }
516    
517    
518    
519     sub _prepareNode_MetaProperties {
520     my $self = shift;
521     my $descent = shift;
522    
523     $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
524    
525     # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
526     my $list = $self->_getNodeList($descent);
527    
528     # get first node
529     my $firstnode = $list->[0];
530    
531     # check if node contains meta properties/nodes
532     # TODO: "cs" is hardcoded here!
533     my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
534     my @found = keys %$firstnode;
535     #my @diff = getDifference(\@found, \@required);
536     my $diff = getDifference(\@required, \@found);
537     #print Dumper(@found);
538     #print Dumper(@required);
539     #print Dumper(@diff);
540     #if (!$#diff || $#diff == -1) {
541     if (isEmpty($diff)) {
542     $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
543     foreach (@required) {
544     my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
545     #print "sql: $sql", "\n";
546     my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
547     #print Dumper($res->getStatus());
548     }
549     }
550    
551     }
552    
553     sub _prepareNode_DummyIdent {
554     my $self = shift;
555     my $descent = shift;
556    
557     $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
558    
559     my $list = $self->_getNodeList($descent);
560     #print Dumper($list);
561     my $i = 0;
562     my $ident_base = 5678983;
563     my $ident_appendix = '0001';
564     foreach my $node (@$list) {
565     my $ident_dummy = $i + $ident_base;
566     $ident_dummy .= $ident_appendix;
567     my $map = {
568     $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
569     cs => undef,
570     };
571    
572     # diff lists and ...
573     my $diff = getDifference([keys %$node], [keys %$map]);
574     next if $#{$diff} == -1;
575    
576     # ... build criteria including all columns
577     my @crits;
578     foreach my $property (@$diff) {
579     next if !$property;
580     my $value = $node->{$property};
581     next if !$value;
582     push @crits, "$property='" . quotesql($value) . "'";
583     }
584     my $crit = join ' AND ', @crits;
585     print "p" if $self->{verbose};
586    
587     #print Dumper($map);
588     #print Dumper($crit);
589    
590     $self->_modifyNode($descent, 'update', $map, $crit);
591     $i++;
592     }
593    
594 joko 1.7 #print "\n" if $self->{verbose};
595 joko 1.1
596     if (!$i) {
597     $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
598     }
599    
600     }
601    
602     # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
603     sub _otherSide {
604     my $self = shift;
605     my $descent = shift;
606     return 'source' if $descent eq 'target';
607     return 'target' if $descent eq 'source';
608     return '';
609     }
610    
611    
612     1;
613 joko 1.6 __END__

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed