/[cvs]/nfo/perl/libs/Data/Transfer/Sync/Core.pm
ViewVC logotype

Contents of /nfo/perl/libs/Data/Transfer/Sync/Core.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.13 - (show annotations)
Sat Jun 19 16:53:38 2004 UTC (20 years ago) by joko
Branch: MAIN
CVS Tags: HEAD
Changes since 1.12: +13 -4 lines
fix: handle local checksum only if database is configured with "hasLocalChecksum"

1 ## -------------------------------------------------------------------------
2 ##
3 ## $Id: Core.pm,v 1.12 2004/06/19 01:45:08 joko Exp $
4 ##
5 ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
6 ##
7 ## See COPYRIGHT section in pod text below for usage and distribution rights.
8 ##
9 ## -------------------------------------------------------------------------
10 ## $Log: Core.pm,v $
11 ## Revision 1.12 2004/06/19 01:45:08 joko
12 ## introduced "local checksum"-mechanism
13 ## moved _dumpCompact to ::Compare::Checksum
14 ##
15 ## Revision 1.11 2004/05/11 20:03:48 jonen
16 ## bugfix[joko] related to Attribute Map
17 ##
18 ## Revision 1.10 2003/06/25 23:03:57 joko
19 ## no debugging
20 ##
21 ## Revision 1.9 2003/05/13 08:17:52 joko
22 ## buildAttributeMap now propagates error
23 ##
24 ## Revision 1.8 2003/03/27 15:31:15 joko
25 ## fixes to modules regarding new namespace(s) below Data::Mungle::*
26 ##
27 ## Revision 1.7 2003/02/21 08:01:11 joko
28 ## debugging, logging
29 ## renamed module
30 ##
31 ## Revision 1.6 2003/02/14 14:03:49 joko
32 ## + logging, debugging
33 ## - refactored code to sister module
34 ##
35 ## Revision 1.5 2003/02/11 05:30:47 joko
36 ## + minor fixes and some debugging mud
37 ##
38 ## Revision 1.4 2003/02/09 05:01:10 joko
39 ## + major structure changes
40 ## - refactored code to sister modules
41 ##
42 ## Revision 1.3 2003/01/20 17:01:14 joko
43 ## + cosmetics and debugging
44 ## + probably refactored code to new plugin-modules 'Metadata.pm' and/or 'StorageInterface.pm' (guess it was the last one...)
45 ##
46 ## Revision 1.2 2003/01/19 02:05:42 joko
47 ## - removed pod-documentation: now in Data/Transfer/Sync.pod
48 ##
49 ## Revision 1.1 2003/01/19 01:23:04 joko
50 ## + new from Data/Transfer/Sync.pm
51 ##
52 ## Revision 1.11 2002/12/23 07:10:59 joko
53 ## + using MD5 for checksum generation again - the 32-bit integer hash from DBI seems to be too lazy
54 ##
55 ## Revision 1.10 2002/12/19 01:07:16 joko
56 ## + fixed output done via $logger
57 ##
58 ## Revision 1.9 2002/12/16 07:02:34 jonen
59 ## + added comment
60 ##
61 ## Revision 1.8 2002/12/15 02:03:09 joko
62 ## + fixed logging-messages
63 ## + additional metadata-checks
64 ##
65 ## Revision 1.7 2002/12/13 21:49:34 joko
66 ## + sub configure
67 ## + sub checkOptions
68 ##
69 ## Revision 1.6 2002/12/06 04:49:10 jonen
70 ## + disabled output-puffer here
71 ##
72 ## Revision 1.5 2002/12/05 08:06:05 joko
73 ## + bugfix with determining empty fields (Null) with DBD::CSV
74 ## + debugging
75 ## + updated comments
76 ##
77 ## Revision 1.4 2002/12/03 15:54:07 joko
78 ## + {import}-flag is now {prepare}-flag
79 ##
80 ## Revision 1.3 2002/12/01 22:26:59 joko
81 ## + minor cosmetics for logging
82 ##
83 ## Revision 1.2 2002/12/01 04:43:25 joko
84 ## + mapping detail entries may now be either an ARRAY or a HASH
85 ## + erase flag is used now (for export-operations)
86 ## + expressions to refer to values inside deep nested structures
87 ## - removed old mappingV2-code
88 ## + cosmetics
89 ## + sub _erase_all
90 ##
91 ## Revision 1.1 2002/11/29 04:45:50 joko
92 ## + initial check in
93 ##
94 ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
95 ## + new
96 ## -------------------------------------------------------------------------
97
98
99 package Data::Transfer::Sync::Core;
100
101 use strict;
102 use warnings;
103
104 use mixin::with qw( Data::Transfer::Sync );
105
106
107 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - main
108
109 use Data::Dumper;
110
111 #use misc::HashExt;
112 use Hash::Serializer;
113 use Data::Mungle::Compare::Struct qw( getDifference isEmpty );
114 use Data::Mungle::Transform::Deep qw( deep_copy expand );
115 use Data::Storage::Container;
116 use DesignPattern::Object;
117 use shortcuts::database qw( quotesql );
118
119 # get logger instance
120 my $logger = Log::Dispatch::Config->instance;
121
122 $| = 1;
123
124 =pod
125 sub new {
126 my $invocant = shift;
127 my $class = ref($invocant) || $invocant;
128 my $self = {};
129 $logger->debug( __PACKAGE__ . "->new(@_)" );
130 bless $self, $class;
131 $self->configure(@_);
132 return $self;
133 }
134 =cut
135
136
137 sub _init {
138 my $self = shift;
139
140 # build new container if necessary
141 $self->{container} = Data::Storage::Container->new() if !$self->{container};
142
143 # add storages to container (optional)
144 foreach (keys %{$self->{storages}}) {
145 $self->{container}->addStorage($_, $self->{storages}->{$_});
146 }
147
148 # trace
149 #print Dumper($self);
150 #exit;
151
152 return 1;
153
154 }
155
156 sub _initV1 {
157 my $self = shift;
158 $logger->debug( __PACKAGE__ . "->_initV1" );
159 die("this should not be reached!");
160 # tag storages with id-authority and checksum-provider information
161 # TODO: better store tag inside metadata to hold bits together!
162 map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
163 map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
164 map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
165 }
166
167
168 # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="ยง/%???
169 sub _run {
170
171 my $self = shift;
172
173 #print "verbose: ", $self->{verbose}, "\n";
174 $self->{verbose} = 1;
175
176 $logger->debug( __PACKAGE__ . "->_run" );
177
178 # for statistics
179 my $tc = Hash::Serializer->new( {} );
180 my $results;
181
182 # set of objects is already in $self->{args}
183 # TODO: make independent of the terminology "object..."
184 $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
185
186 # apply filter
187 if (my $filter = $self->{meta}->{source}->{filter}) {
188 #print Dumper($filter);
189 #exit;
190 $results ||= $self->_getNodeList('source', $filter);
191 }
192
193 # get reference to node list from convenient method provided by CORE-HANDLE
194 $results ||= $self->_getNodeList('source');
195
196 # checkpoint: do we actually have a list to iterate through?
197 if (!$results || !@{$results}) {
198 $logger->notice( __PACKAGE__ . "->_run: No nodes to synchronize." );
199 return;
200 }
201
202 #print Dumper(@$results);
203 #exit;
204
205 # check if we actually *have* a synchronization method
206 if (!$self->{options}->{metadata}->{syncMethod}) {
207 $logger->critical( __PACKAGE__ . "->_run: No synchronization method (checksum|timestamp) specified" );
208 return;
209 }
210
211
212 # dereference
213 my @results = @{$results};
214 #print Dumper(@results);
215
216 # iterate through set
217 foreach my $source_node_real (@results) {
218
219 print ":" if $self->{verbose};
220
221 #print Dumper($source_node_real);
222
223 $tc->{total}++;
224
225 #print "=" x 80, "\n";
226
227 # clone object (in case we have to modify it here)
228 # TODO:
229 # - is a "deep_copy" needed here if occouring modifications take place?
230 # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
231 # - after all, just take care for now that this object doesn't get updated!
232 # - so, just use its reference for now - if some cloning is needed in future, do this here!
233 my $source_node = $source_node_real;
234 #my $source_node = expand($source_node_real);
235
236 # modify entry - handle new style callbacks (the readers)
237
238 # trace
239 #print Dumper($source_node);
240 #exit;
241
242 my $descent = 'source';
243
244 # handle callbacks right now while scanning them (asymmetric to the writers)
245 my $map_callbacks = {};
246 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
247
248 # trace
249 #print Dumper($callbacks);
250 #exit;
251
252 my $error = 0;
253
254 foreach my $node (keys %{$callbacks->{read}}) {
255
256 #print "cb_node: $node", "\n";
257
258 my $object = $source_node;
259 my $value; # = $source_node->{$node};
260
261 # trace
262 #print Dumper($self->{options});
263
264 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
265 #my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
266 my $perl_callback = $self->{meta}->{$descent}->{nodeType} . '::' . $node . '_read';
267 my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
268 #print $evalstring, "\n"; exit;
269 my $cb_result = eval($evalstring);
270 if ($@) {
271 $error = 1;
272 $logger->error( __PACKAGE__ . "->_run: $@" );
273 next;
274 }
275 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
276
277 $source_node->{$node} = $cb_result;
278
279 }
280
281 }
282
283 # trace
284 #print Dumper($source_node);
285
286 # exclude defined fields (simply delete from object)
287 map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
288
289 # here we accumulate information about the status of the current node (payload/row/object/item/entry)
290 $self->{node} = {};
291 $self->{node}->{source}->{payload} = $source_node;
292
293 # trace
294 #print Dumper($self->{node});
295 #exit;
296
297 # determine ident of entry
298 my $identOK = $self->_resolveNodeIdent('source');
299 #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
300 if (!$identOK) {
301 #print Dumper($self->{meta}->{source});
302 $logger->critical( __PACKAGE__ . "->_run: No ident found in source node ( nodeName='$self->{meta}->{source}->{nodeName}', nodeType='$self->{meta}->{source}->{nodeType}') try to \"prepare\" this node first?" );
303 return;
304 }
305
306 #print "l" if $self->{verbose};
307 my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
308
309 # mark node as new either if there's no ident or if stat/load failed
310 if (!$statOK) {
311 $self->{node}->{status}->{new} = 1;
312 print "n" if $self->{verbose};
313 }
314
315 # determine status of entry by synchronization method
316 if ( lc $self->{options}->{metadata}->{syncMethod} eq 'checksum' ) {
317 #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
318 #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
319
320 # new 2004-06-17: calculate local checksum of source node
321 # new 2004-06-19: ... only if requested
322 if ($self->{options}->{source}->{storage}->{handle}->{locator}->{hasLocalChecksum}) {
323 $self->handleLocalChecksum('source');
324 }
325
326 # calculate checksum of source node
327 #$self->_calcChecksum('source');
328 if (!$self->readChecksum('source')) {
329 $logger->warning( __PACKAGE__ . "->_run: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
330 $tc->{skip}++;
331 print "s" if $self->{verbose};
332 next;
333 }
334
335 # get checksum from synchronization target
336 $self->readChecksum('target');
337 #if (!$self->readChecksum('target')) {
338 # $logger->critical( __PACKAGE__ . "->readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
339 # next;
340 #}
341
342 # pre flight check: do we actually have a checksum provided?
343 #if (!$self->{node}->{source}->{checksum}) {
344 # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
345 # return;
346 #}
347
348 # determine if entry is "new" or "dirty"
349 # after all, this seems to be the point where the hammer falls.....
350 print "c" if $self->{verbose};
351
352 # trace
353 #print Dumper($self->{node});
354 #exit;
355 #print "LOCAL: ", $self->{node}->{source}->{checksum_local_storage}, " <-> ", $self->{node}->{source}->{checksum_local_calculated}, "\n";
356 #print "REMOTE: ", $self->{node}->{source}->{checksum}, " <-> ", $self->{node}->{target}->{checksum}, "\n";
357
358 # calculate new/dirty status
359 $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
360 if (!$self->{node}->{status}->{new}) {
361 $self->{node}->{status}->{dirty} =
362 $self->{node}->{status}->{new} ||
363 (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
364 ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
365 $self->{args}->{force};
366 }
367
368 # new 2004-06-17: also check if local checksum is inconsistent
369 # new 2004-06-19: ... only if requested
370 if ($self->{options}->{source}->{storage}->{handle}->{locator}->{hasLocalChecksum} and
371 ($self->{node}->{source}->{checksum_local_storage} ne $self->{node}->{source}->{checksum_local_calculated}) ) {
372 $self->{node}->{status}->{dirty_local} = 1;
373 $self->{node}->{status}->{dirty} = 1;
374 }
375
376 } else {
377 $logger->warning( __PACKAGE__ . "->_run: Synchronization method '$self->{options}->{metadata}->{syncMethod}' is not implemented" );
378 $tc->{skip}++;
379 print "s" if $self->{verbose};
380 next;
381 }
382
383 # new 2004-06-17: also update local checksum
384 if ($self->{node}->{status}->{dirty_local}) {
385 $tc->{locally_modified}++;
386 print "[lm]" if $self->{verbose};
387 $self->_doModify_LocalChecksum('source');
388 }
389
390 # first reaction on entry-status: continue with next entry if the current is already "in sync"
391 if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
392 $tc->{in_sync}++;
393 next;
394 }
395
396 #print Dumper($self->{node}->{source});
397
398 # build map to actually transfer the data from source to target
399 if (!$self->buildAttributeMap()) {
400 #$logger->warning( __PACKAGE__ . "->_run: Attribute Map could not be created. Will not insert or modify node.");
401 push( @{$tc->{error_per_row}}, "Attribute Map could not be created. Will not insert or modify node $self->{node}->{source}->{ident}.");
402 #push( @{$tc->{error_per_row}}, "Attribute Map could not be created. Will not insert or modify node " . Dumper($self->{node}->{source}) . ".");
403 $tc->{error}++;
404 print "e" if $self->{verbose};
405 next;
406 }
407
408 # trace
409 #print Dumper($self->{node}); exit;
410 #print "attempt", "\n";
411
412 # additional (new) checks for feature "write-protection"
413 if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
414 $tc->{attempt_transfer}++;
415 print "\n" if $self->{verbose};
416 $logger->notice( __PACKAGE__ . "->_run: Target is write-protected. Will not insert or modify node. " .
417 "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
418 print "\n" if $self->{verbose};
419 $tc->{skip}++;
420 next;
421 }
422
423 # trace
424 #print Dumper($self);
425 #exit;
426
427 # transfer contents of map to target
428 if ($self->{node}->{status}->{new}) {
429 $tc->{attempt_new}++;
430 $self->_doTransferToTarget('insert');
431 # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
432 #print Dumper($self->{node});
433 $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
434 $self->readChecksum('target');
435
436 } elsif ($self->{node}->{status}->{dirty}) {
437 $tc->{attempt_modify}++;
438 # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
439 $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
440 $self->_doTransferToTarget('update');
441 $self->readChecksum('target');
442 }
443
444 if ($self->{node}->{status}->{ok}) {
445 $tc->{ok}++;
446 print "t" if $self->{verbose};
447 }
448
449 if ($self->{node}->{status}->{error}) {
450 $tc->{error}++;
451 push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
452 print "e" if $self->{verbose};
453 }
454
455 # trace
456 #print Dumper($self);
457 #exit;
458
459 # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
460 # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
461 #if ($self->{node}->{status}->{ok} && $self->{options}->{target}->{storage}->{idAuthority}) {
462 if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{isIdentAuthority}) {
463 print "r" if $self->{verbose};
464 #print Dumper($self->{meta});
465 #print Dumper($self->{node});
466 #exit;
467 $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
468 }
469
470 #print "UNLOAD", "\n";
471 #$self->{meta}->{source}->{storage}->unload( $self->{node}->{source}->{payload} );
472
473 }
474
475 print "\n" if $self->{verbose};
476
477 # build user-message from some stats
478 my $msg = "statistics: $tc";
479
480 if ($tc->{error_per_row}) {
481 $msg .= "\n";
482 $msg .= "errors from \"error_per_row\":" . "\n";
483 $msg .= Dumper($tc->{error_per_row});
484 }
485
486 # todo!!!
487 #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
488 #$logger->info( __PACKAGE__ . "->_run: $msg" );
489 $logger->info($msg . "\n");
490
491 return $tc;
492
493 }
494
495
496 sub _doTransferToTarget {
497 my $self = shift;
498 my $action = shift;
499
500 # trace
501 #print Dumper($self->{meta});
502 #print Dumper($self->{node});
503 #exit;
504
505 $self->_modifyNode('target', $action, $self->{node}->{map});
506 }
507
508
509 sub _doModifySource_IdentChecksum {
510 my $self = shift;
511 my $ident_new = shift;
512 # this changes an old node to a new one including ident and checksum
513 # TODO:
514 # - eventually introduce an external resource to store this data to
515 # - we won't have to "re"-modify the source node here
516 my $map = {
517 $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
518 cs => $self->{node}->{target}->{checksum},
519 };
520
521 #print Dumper($map);
522 #print Dumper($self->{node});
523 #exit;
524
525 $self->_modifyNode('source', 'update', $map);
526 }
527
528 sub _doModify_LocalChecksum {
529 my $self = shift;
530 my $descent = shift;
531 my $map = {
532 cs_local => $self->{node}->{$descent}->{checksum_local_calculated},
533 };
534 $self->_modifyNode($descent, 'update', $map);
535 }
536
537 sub _prepareNode_MetaProperties {
538 my $self = shift;
539 my $descent = shift;
540
541 $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
542
543 # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
544 my $list = $self->_getNodeList($descent);
545
546 # get first node
547 my $firstnode = $list->[0];
548
549 # check if node contains meta properties/nodes
550 # TODO: "cs" is hardcoded here!
551 my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
552 my @found = keys %$firstnode;
553 #my @diff = getDifference(\@found, \@required);
554 my $diff = getDifference(\@required, \@found);
555 #print Dumper(@found);
556 #print Dumper(@required);
557 #print Dumper(@diff);
558 #if (!$#diff || $#diff == -1) {
559 if (isEmpty($diff)) {
560 $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
561 foreach (@required) {
562 my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
563 #print "sql: $sql", "\n";
564 my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
565 #print Dumper($res->getStatus());
566 }
567 }
568
569 }
570
571 # TODO: load column-metadata from reversed mapping-metadata
572 sub _prepareNode_DummyIdent {
573 my $self = shift;
574 my $descent = shift;
575
576 #print Dumper($self->{options});
577 $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
578
579 my $list = $self->_getNodeList($descent);
580 #print Dumper($list);
581 my $i = 0;
582 my $ident_base = 5678983;
583 my $ident_appendix = '0001';
584 foreach my $node (@$list) {
585 my $ident_dummy = $i + $ident_base;
586 $ident_dummy .= $ident_appendix;
587 my $map = {
588 $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
589 cs => undef,
590 cs_local => undef,
591 };
592
593 # diff lists and ...
594 my $diff = getDifference([keys %$node], [keys %$map]);
595 next if $#{$diff} == -1;
596
597 # ... build criteria including all columns
598 my @crits;
599 foreach my $property (@$diff) {
600 next if !$property;
601 my $value = $node->{$property};
602 next if !$value;
603 push @crits, "$property='" . quotesql($value) . "'";
604 }
605 my $crit = join ' AND ', @crits;
606 print "p" if $self->{verbose};
607
608 #print Dumper($map);
609 #print Dumper($crit);
610
611 $self->_modifyNode($descent, 'update', $map, $crit);
612 $i++;
613 }
614
615 print "\n" if $self->{verbose};
616
617 if (!$i) {
618 $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
619 }
620
621 }
622
623 # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
624 sub _otherSide {
625 my $self = shift;
626 my $descent = shift;
627 return 'source' if $descent eq 'target';
628 return 'target' if $descent eq 'source';
629 return '';
630 }
631
632
633 1;
634 __END__

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed