/[cvs]/nfo/perl/libs/Data/Transfer/Sync/Core.pm
ViewVC logotype

Contents of /nfo/perl/libs/Data/Transfer/Sync/Core.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.12 - (show annotations)
Sat Jun 19 01:45:08 2004 UTC (20 years ago) by joko
Branch: MAIN
Changes since 1.11: +43 -60 lines
introduced "local checksum"-mechanism
moved _dumpCompact to ::Compare::Checksum

1 ## -------------------------------------------------------------------------
2 ##
3 ## $Id: Core.pm,v 1.11 2004/05/11 20:03:48 jonen Exp $
4 ##
5 ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
6 ##
7 ## See COPYRIGHT section in pod text below for usage and distribution rights.
8 ##
9 ## -------------------------------------------------------------------------
10 ## $Log: Core.pm,v $
11 ## Revision 1.11 2004/05/11 20:03:48 jonen
12 ## bugfix[joko] related to Attribute Map
13 ##
14 ## Revision 1.10 2003/06/25 23:03:57 joko
15 ## no debugging
16 ##
17 ## Revision 1.9 2003/05/13 08:17:52 joko
18 ## buildAttributeMap now propagates error
19 ##
20 ## Revision 1.8 2003/03/27 15:31:15 joko
21 ## fixes to modules regarding new namespace(s) below Data::Mungle::*
22 ##
23 ## Revision 1.7 2003/02/21 08:01:11 joko
24 ## debugging, logging
25 ## renamed module
26 ##
27 ## Revision 1.6 2003/02/14 14:03:49 joko
28 ## + logging, debugging
29 ## - refactored code to sister module
30 ##
31 ## Revision 1.5 2003/02/11 05:30:47 joko
32 ## + minor fixes and some debugging mud
33 ##
34 ## Revision 1.4 2003/02/09 05:01:10 joko
35 ## + major structure changes
36 ## - refactored code to sister modules
37 ##
38 ## Revision 1.3 2003/01/20 17:01:14 joko
39 ## + cosmetics and debugging
40 ## + probably refactored code to new plugin-modules 'Metadata.pm' and/or 'StorageInterface.pm' (guess it was the last one...)
41 ##
42 ## Revision 1.2 2003/01/19 02:05:42 joko
43 ## - removed pod-documentation: now in Data/Transfer/Sync.pod
44 ##
45 ## Revision 1.1 2003/01/19 01:23:04 joko
46 ## + new from Data/Transfer/Sync.pm
47 ##
48 ## Revision 1.11 2002/12/23 07:10:59 joko
49 ## + using MD5 for checksum generation again - the 32-bit integer hash from DBI seems to be too lazy
50 ##
51 ## Revision 1.10 2002/12/19 01:07:16 joko
52 ## + fixed output done via $logger
53 ##
54 ## Revision 1.9 2002/12/16 07:02:34 jonen
55 ## + added comment
56 ##
57 ## Revision 1.8 2002/12/15 02:03:09 joko
58 ## + fixed logging-messages
59 ## + additional metadata-checks
60 ##
61 ## Revision 1.7 2002/12/13 21:49:34 joko
62 ## + sub configure
63 ## + sub checkOptions
64 ##
65 ## Revision 1.6 2002/12/06 04:49:10 jonen
66 ## + disabled output-puffer here
67 ##
68 ## Revision 1.5 2002/12/05 08:06:05 joko
69 ## + bugfix with determining empty fields (Null) with DBD::CSV
70 ## + debugging
71 ## + updated comments
72 ##
73 ## Revision 1.4 2002/12/03 15:54:07 joko
74 ## + {import}-flag is now {prepare}-flag
75 ##
76 ## Revision 1.3 2002/12/01 22:26:59 joko
77 ## + minor cosmetics for logging
78 ##
79 ## Revision 1.2 2002/12/01 04:43:25 joko
80 ## + mapping detail entries may now be either an ARRAY or a HASH
81 ## + erase flag is used now (for export-operations)
82 ## + expressions to refer to values inside deep nested structures
83 ## - removed old mappingV2-code
84 ## + cosmetics
85 ## + sub _erase_all
86 ##
87 ## Revision 1.1 2002/11/29 04:45:50 joko
88 ## + initial check in
89 ##
90 ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
91 ## + new
92 ## -------------------------------------------------------------------------
93
94
95 package Data::Transfer::Sync::Core;
96
97 use strict;
98 use warnings;
99
100 use mixin::with qw( Data::Transfer::Sync );
101
102
103 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - main
104
105 use Data::Dumper;
106
107 #use misc::HashExt;
108 use Hash::Serializer;
109 use Data::Mungle::Compare::Struct qw( getDifference isEmpty );
110 use Data::Mungle::Transform::Deep qw( deep_copy expand );
111 use Data::Storage::Container;
112 use DesignPattern::Object;
113 use shortcuts::database qw( quotesql );
114
115 # get logger instance
116 my $logger = Log::Dispatch::Config->instance;
117
118 $| = 1;
119
120 =pod
121 sub new {
122 my $invocant = shift;
123 my $class = ref($invocant) || $invocant;
124 my $self = {};
125 $logger->debug( __PACKAGE__ . "->new(@_)" );
126 bless $self, $class;
127 $self->configure(@_);
128 return $self;
129 }
130 =cut
131
132
133 sub _init {
134 my $self = shift;
135
136 # build new container if necessary
137 $self->{container} = Data::Storage::Container->new() if !$self->{container};
138
139 # add storages to container (optional)
140 foreach (keys %{$self->{storages}}) {
141 $self->{container}->addStorage($_, $self->{storages}->{$_});
142 }
143
144 # trace
145 #print Dumper($self);
146 #exit;
147
148 return 1;
149
150 }
151
152 sub _initV1 {
153 my $self = shift;
154 $logger->debug( __PACKAGE__ . "->_initV1" );
155 die("this should not be reached!");
156 # tag storages with id-authority and checksum-provider information
157 # TODO: better store tag inside metadata to hold bits together!
158 map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
159 map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
160 map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
161 }
162
163
164 # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="ยง/%???
165 sub _run {
166
167 my $self = shift;
168
169 #print "verbose: ", $self->{verbose}, "\n";
170 $self->{verbose} = 1;
171
172 $logger->debug( __PACKAGE__ . "->_run" );
173
174 # for statistics
175 my $tc = Hash::Serializer->new( {} );
176 my $results;
177
178 # set of objects is already in $self->{args}
179 # TODO: make independent of the terminology "object..."
180 $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
181
182 # apply filter
183 if (my $filter = $self->{meta}->{source}->{filter}) {
184 #print Dumper($filter);
185 #exit;
186 $results ||= $self->_getNodeList('source', $filter);
187 }
188
189 # get reference to node list from convenient method provided by CORE-HANDLE
190 $results ||= $self->_getNodeList('source');
191
192 # checkpoint: do we actually have a list to iterate through?
193 if (!$results || !@{$results}) {
194 $logger->notice( __PACKAGE__ . "->_run: No nodes to synchronize." );
195 return;
196 }
197
198 #print Dumper(@$results);
199 #exit;
200
201 # check if we actually *have* a synchronization method
202 if (!$self->{options}->{metadata}->{syncMethod}) {
203 $logger->critical( __PACKAGE__ . "->_run: No synchronization method (checksum|timestamp) specified" );
204 return;
205 }
206
207
208 # dereference
209 my @results = @{$results};
210 #print Dumper(@results);
211
212 # iterate through set
213 foreach my $source_node_real (@results) {
214
215 print ":" if $self->{verbose};
216
217 #print Dumper($source_node_real);
218
219 $tc->{total}++;
220
221 #print "=" x 80, "\n";
222
223 # clone object (in case we have to modify it here)
224 # TODO:
225 # - is a "deep_copy" needed here if occouring modifications take place?
226 # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
227 # - after all, just take care for now that this object doesn't get updated!
228 # - so, just use its reference for now - if some cloning is needed in future, do this here!
229 my $source_node = $source_node_real;
230 #my $source_node = expand($source_node_real);
231
232 # modify entry - handle new style callbacks (the readers)
233
234 # trace
235 #print Dumper($source_node);
236 #exit;
237
238 my $descent = 'source';
239
240 # handle callbacks right now while scanning them (asymmetric to the writers)
241 my $map_callbacks = {};
242 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
243
244 # trace
245 #print Dumper($callbacks);
246 #exit;
247
248 my $error = 0;
249
250 foreach my $node (keys %{$callbacks->{read}}) {
251
252 #print "cb_node: $node", "\n";
253
254 my $object = $source_node;
255 my $value; # = $source_node->{$node};
256
257 # trace
258 #print Dumper($self->{options});
259
260 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
261 #my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
262 my $perl_callback = $self->{meta}->{$descent}->{nodeType} . '::' . $node . '_read';
263 my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
264 #print $evalstring, "\n"; exit;
265 my $cb_result = eval($evalstring);
266 if ($@) {
267 $error = 1;
268 $logger->error( __PACKAGE__ . "->_run: $@" );
269 next;
270 }
271 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
272
273 $source_node->{$node} = $cb_result;
274
275 }
276
277 }
278
279 # trace
280 #print Dumper($source_node);
281
282 # exclude defined fields (simply delete from object)
283 map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
284
285 # here we accumulate information about the status of the current node (payload/row/object/item/entry)
286 $self->{node} = {};
287 $self->{node}->{source}->{payload} = $source_node;
288
289 # trace
290 #print Dumper($self->{node});
291 #exit;
292
293 # determine ident of entry
294 my $identOK = $self->_resolveNodeIdent('source');
295 #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
296 if (!$identOK) {
297 #print Dumper($self->{meta}->{source});
298 $logger->critical( __PACKAGE__ . "->_run: No ident found in source node ( nodeName='$self->{meta}->{source}->{nodeName}', nodeType='$self->{meta}->{source}->{nodeType}') try to \"prepare\" this node first?" );
299 return;
300 }
301
302 #print "l" if $self->{verbose};
303 my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
304
305 # mark node as new either if there's no ident or if stat/load failed
306 if (!$statOK) {
307 $self->{node}->{status}->{new} = 1;
308 print "n" if $self->{verbose};
309 }
310
311 # determine status of entry by synchronization method
312 if ( lc $self->{options}->{metadata}->{syncMethod} eq 'checksum' ) {
313 #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
314 #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
315
316 # calculate local checksum of source node
317 $self->handleLocalChecksum('source');
318
319 # calculate checksum of source node
320 #$self->_calcChecksum('source');
321 if (!$self->readChecksum('source')) {
322 $logger->warning( __PACKAGE__ . "->_run: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
323 $tc->{skip}++;
324 print "s" if $self->{verbose};
325 next;
326 }
327
328 # get checksum from synchronization target
329 $self->readChecksum('target');
330 #if (!$self->readChecksum('target')) {
331 # $logger->critical( __PACKAGE__ . "->readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
332 # next;
333 #}
334
335 # pre flight check: do we actually have a checksum provided?
336 #if (!$self->{node}->{source}->{checksum}) {
337 # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
338 # return;
339 #}
340
341 # determine if entry is "new" or "dirty"
342 # after all, this seems to be the point where the hammer falls.....
343 print "c" if $self->{verbose};
344
345 # trace
346 #print Dumper($self->{node});
347 #exit;
348 #print "LOCAL: ", $self->{node}->{source}->{checksum_local_storage}, " <-> ", $self->{node}->{source}->{checksum_local_calculated}, "\n";
349 #print "REMOTE: ", $self->{node}->{source}->{checksum}, " <-> ", $self->{node}->{target}->{checksum}, "\n";
350
351 # calculate new/dirty status
352 $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
353 if (!$self->{node}->{status}->{new}) {
354 $self->{node}->{status}->{dirty} =
355 $self->{node}->{status}->{new} ||
356 (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
357 ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
358 $self->{args}->{force};
359 }
360
361 # new 2004-06-17: also check if local checksum is inconsistent
362 if ($self->{node}->{source}->{checksum_local_storage} ne $self->{node}->{source}->{checksum_local_calculated}) {
363 $self->{node}->{status}->{dirty_local} = 1;
364 $self->{node}->{status}->{dirty} = 1;
365 }
366
367 } else {
368 $logger->warning( __PACKAGE__ . "->_run: Synchronization method '$self->{options}->{metadata}->{syncMethod}' is not implemented" );
369 $tc->{skip}++;
370 print "s" if $self->{verbose};
371 next;
372 }
373
374 # new 2004-06-17: also update local checksum
375 if ($self->{node}->{status}->{dirty_local}) {
376 $tc->{locally_modified}++;
377 print "[lm]" if $self->{verbose};
378 $self->_doModify_LocalChecksum('source');
379 }
380
381 # first reaction on entry-status: continue with next entry if the current is already "in sync"
382 if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
383 $tc->{in_sync}++;
384 next;
385 }
386
387 #print Dumper($self->{node}->{source});
388
389 # build map to actually transfer the data from source to target
390 if (!$self->buildAttributeMap()) {
391 #$logger->warning( __PACKAGE__ . "->_run: Attribute Map could not be created. Will not insert or modify node.");
392 push( @{$tc->{error_per_row}}, "Attribute Map could not be created. Will not insert or modify node $self->{node}->{source}->{ident}.");
393 #push( @{$tc->{error_per_row}}, "Attribute Map could not be created. Will not insert or modify node " . Dumper($self->{node}->{source}) . ".");
394 $tc->{error}++;
395 print "e" if $self->{verbose};
396 next;
397 }
398
399 # trace
400 #print Dumper($self->{node}); exit;
401 #print "attempt", "\n";
402
403 # additional (new) checks for feature "write-protection"
404 if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
405 $tc->{attempt_transfer}++;
406 print "\n" if $self->{verbose};
407 $logger->notice( __PACKAGE__ . "->_run: Target is write-protected. Will not insert or modify node. " .
408 "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
409 print "\n" if $self->{verbose};
410 $tc->{skip}++;
411 next;
412 }
413
414 # trace
415 #print Dumper($self);
416 #exit;
417
418 # transfer contents of map to target
419 if ($self->{node}->{status}->{new}) {
420 $tc->{attempt_new}++;
421 $self->_doTransferToTarget('insert');
422 # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
423 #print Dumper($self->{node});
424 $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
425 $self->readChecksum('target');
426
427 } elsif ($self->{node}->{status}->{dirty}) {
428 $tc->{attempt_modify}++;
429 # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
430 $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
431 $self->_doTransferToTarget('update');
432 $self->readChecksum('target');
433 }
434
435 if ($self->{node}->{status}->{ok}) {
436 $tc->{ok}++;
437 print "t" if $self->{verbose};
438 }
439
440 if ($self->{node}->{status}->{error}) {
441 $tc->{error}++;
442 push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
443 print "e" if $self->{verbose};
444 }
445
446 # trace
447 #print Dumper($self);
448 #exit;
449
450 # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
451 # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
452 #if ($self->{node}->{status}->{ok} && $self->{options}->{target}->{storage}->{idAuthority}) {
453 if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{isIdentAuthority}) {
454 print "r" if $self->{verbose};
455 #print Dumper($self->{meta});
456 #print Dumper($self->{node});
457 #exit;
458 $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
459 }
460
461 #print "UNLOAD", "\n";
462 #$self->{meta}->{source}->{storage}->unload( $self->{node}->{source}->{payload} );
463
464 }
465
466 print "\n" if $self->{verbose};
467
468 # build user-message from some stats
469 my $msg = "statistics: $tc";
470
471 if ($tc->{error_per_row}) {
472 $msg .= "\n";
473 $msg .= "errors from \"error_per_row\":" . "\n";
474 $msg .= Dumper($tc->{error_per_row});
475 }
476
477 # todo!!!
478 #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
479 #$logger->info( __PACKAGE__ . "->_run: $msg" );
480 $logger->info($msg . "\n");
481
482 return $tc;
483
484 }
485
486
487 sub _doTransferToTarget {
488 my $self = shift;
489 my $action = shift;
490
491 # trace
492 #print Dumper($self->{meta});
493 #print Dumper($self->{node});
494 #exit;
495
496 $self->_modifyNode('target', $action, $self->{node}->{map});
497 }
498
499
500 sub _doModifySource_IdentChecksum {
501 my $self = shift;
502 my $ident_new = shift;
503 # this changes an old node to a new one including ident and checksum
504 # TODO:
505 # - eventually introduce an external resource to store this data to
506 # - we won't have to "re"-modify the source node here
507 my $map = {
508 $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
509 cs => $self->{node}->{target}->{checksum},
510 };
511
512 #print Dumper($map);
513 #print Dumper($self->{node});
514 #exit;
515
516 $self->_modifyNode('source', 'update', $map);
517 }
518
519 sub _doModify_LocalChecksum {
520 my $self = shift;
521 my $descent = shift;
522 my $map = {
523 cs_local => $self->{node}->{$descent}->{checksum_local_calculated},
524 };
525 $self->_modifyNode($descent, 'update', $map);
526 }
527
528 sub _prepareNode_MetaProperties {
529 my $self = shift;
530 my $descent = shift;
531
532 $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
533
534 # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
535 my $list = $self->_getNodeList($descent);
536
537 # get first node
538 my $firstnode = $list->[0];
539
540 # check if node contains meta properties/nodes
541 # TODO: "cs" is hardcoded here!
542 my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
543 my @found = keys %$firstnode;
544 #my @diff = getDifference(\@found, \@required);
545 my $diff = getDifference(\@required, \@found);
546 #print Dumper(@found);
547 #print Dumper(@required);
548 #print Dumper(@diff);
549 #if (!$#diff || $#diff == -1) {
550 if (isEmpty($diff)) {
551 $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
552 foreach (@required) {
553 my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
554 #print "sql: $sql", "\n";
555 my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
556 #print Dumper($res->getStatus());
557 }
558 }
559
560 }
561
562 # TODO: load column-metadata from reversed mapping-metadata
563 sub _prepareNode_DummyIdent {
564 my $self = shift;
565 my $descent = shift;
566
567 #print Dumper($self->{options});
568 $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
569
570 my $list = $self->_getNodeList($descent);
571 #print Dumper($list);
572 my $i = 0;
573 my $ident_base = 5678983;
574 my $ident_appendix = '0001';
575 foreach my $node (@$list) {
576 my $ident_dummy = $i + $ident_base;
577 $ident_dummy .= $ident_appendix;
578 my $map = {
579 $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
580 cs => undef,
581 cs_local => undef,
582 };
583
584 # diff lists and ...
585 my $diff = getDifference([keys %$node], [keys %$map]);
586 next if $#{$diff} == -1;
587
588 # ... build criteria including all columns
589 my @crits;
590 foreach my $property (@$diff) {
591 next if !$property;
592 my $value = $node->{$property};
593 next if !$value;
594 push @crits, "$property='" . quotesql($value) . "'";
595 }
596 my $crit = join ' AND ', @crits;
597 print "p" if $self->{verbose};
598
599 #print Dumper($map);
600 #print Dumper($crit);
601
602 $self->_modifyNode($descent, 'update', $map, $crit);
603 $i++;
604 }
605
606 print "\n" if $self->{verbose};
607
608 if (!$i) {
609 $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
610 }
611
612 }
613
614 # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
615 sub _otherSide {
616 my $self = shift;
617 my $descent = shift;
618 return 'source' if $descent eq 'target';
619 return 'target' if $descent eq 'source';
620 return '';
621 }
622
623
624 1;
625 __END__

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed