/[cvs]/nfo/perl/libs/Data/Transfer/Sync/Core.pm
ViewVC logotype

Contents of /nfo/perl/libs/Data/Transfer/Sync/Core.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.9 - (show annotations)
Tue May 13 08:17:52 2003 UTC (21 years, 2 months ago) by joko
Branch: MAIN
Changes since 1.8: +16 -8 lines
buildAttributeMap now propagates error

1 ## -------------------------------------------------------------------------
2 ##
3 ## $Id: Core.pm,v 1.8 2003/03/27 15:31:15 joko Exp $
4 ##
5 ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
6 ##
7 ## See COPYRIGHT section in pod text below for usage and distribution rights.
8 ##
9 ## -------------------------------------------------------------------------
10 ## $Log: Core.pm,v $
11 ## Revision 1.8 2003/03/27 15:31:15 joko
12 ## fixes to modules regarding new namespace(s) below Data::Mungle::*
13 ##
14 ## Revision 1.7 2003/02/21 08:01:11 joko
15 ## debugging, logging
16 ## renamed module
17 ##
18 ## Revision 1.6 2003/02/14 14:03:49 joko
19 ## + logging, debugging
20 ## - refactored code to sister module
21 ##
22 ## Revision 1.5 2003/02/11 05:30:47 joko
23 ## + minor fixes and some debugging mud
24 ##
25 ## Revision 1.4 2003/02/09 05:01:10 joko
26 ## + major structure changes
27 ## - refactored code to sister modules
28 ##
29 ## Revision 1.3 2003/01/20 17:01:14 joko
30 ## + cosmetics and debugging
31 ## + probably refactored code to new plugin-modules 'Metadata.pm' and/or 'StorageInterface.pm' (guess it was the last one...)
32 ##
33 ## Revision 1.2 2003/01/19 02:05:42 joko
34 ## - removed pod-documentation: now in Data/Transfer/Sync.pod
35 ##
36 ## Revision 1.1 2003/01/19 01:23:04 joko
37 ## + new from Data/Transfer/Sync.pm
38 ##
39 ## Revision 1.11 2002/12/23 07:10:59 joko
40 ## + using MD5 for checksum generation again - the 32-bit integer hash from DBI seems to be too lazy
41 ##
42 ## Revision 1.10 2002/12/19 01:07:16 joko
43 ## + fixed output done via $logger
44 ##
45 ## Revision 1.9 2002/12/16 07:02:34 jonen
46 ## + added comment
47 ##
48 ## Revision 1.8 2002/12/15 02:03:09 joko
49 ## + fixed logging-messages
50 ## + additional metadata-checks
51 ##
52 ## Revision 1.7 2002/12/13 21:49:34 joko
53 ## + sub configure
54 ## + sub checkOptions
55 ##
56 ## Revision 1.6 2002/12/06 04:49:10 jonen
57 ## + disabled output-puffer here
58 ##
59 ## Revision 1.5 2002/12/05 08:06:05 joko
60 ## + bugfix with determining empty fields (Null) with DBD::CSV
61 ## + debugging
62 ## + updated comments
63 ##
64 ## Revision 1.4 2002/12/03 15:54:07 joko
65 ## + {import}-flag is now {prepare}-flag
66 ##
67 ## Revision 1.3 2002/12/01 22:26:59 joko
68 ## + minor cosmetics for logging
69 ##
70 ## Revision 1.2 2002/12/01 04:43:25 joko
71 ## + mapping detail entries may now be either an ARRAY or a HASH
72 ## + erase flag is used now (for export-operations)
73 ## + expressions to refer to values inside deep nested structures
74 ## - removed old mappingV2-code
75 ## + cosmetics
76 ## + sub _erase_all
77 ##
78 ## Revision 1.1 2002/11/29 04:45:50 joko
79 ## + initial check in
80 ##
81 ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
82 ## + new
83 ## -------------------------------------------------------------------------
84
85
86 package Data::Transfer::Sync::Core;
87
88 use strict;
89 use warnings;
90
91 use mixin::with qw( Data::Transfer::Sync );
92
93
94 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - main
95
96 use Data::Dumper;
97
98 #use misc::HashExt;
99 use Hash::Serializer;
100 use Data::Mungle::Compare::Struct qw( getDifference isEmpty );
101 use Data::Storage::Container;
102 use DesignPattern::Object;
103 use shortcuts::database qw( quotesql );
104
105 # get logger instance
106 my $logger = Log::Dispatch::Config->instance;
107
108 $| = 1;
109
110 =pod
111 sub new {
112 my $invocant = shift;
113 my $class = ref($invocant) || $invocant;
114 my $self = {};
115 $logger->debug( __PACKAGE__ . "->new(@_)" );
116 bless $self, $class;
117 $self->configure(@_);
118 return $self;
119 }
120 =cut
121
122
123 sub _init {
124 my $self = shift;
125
126 # build new container if necessary
127 $self->{container} = Data::Storage::Container->new() if !$self->{container};
128
129 # add storages to container (optional)
130 foreach (keys %{$self->{storages}}) {
131 $self->{container}->addStorage($_, $self->{storages}->{$_});
132 }
133
134 # trace
135 #print Dumper($self);
136 #exit;
137
138 return 1;
139
140 }
141
142 sub _initV1 {
143 my $self = shift;
144 $logger->debug( __PACKAGE__ . "->_initV1" );
145 die("this should not be reached!");
146 # tag storages with id-authority and checksum-provider information
147 # TODO: better store tag inside metadata to hold bits together!
148 map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
149 map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
150 map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
151 }
152
153
154 # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="ยง/%???
155 sub _run {
156
157 my $self = shift;
158
159 #print "verbose: ", $self->{verbose}, "\n";
160 $self->{verbose} = 1;
161
162 $logger->debug( __PACKAGE__ . "->_run" );
163
164 # for statistics
165 my $tc = Hash::Serializer->new( {} );
166 my $results;
167
168 # set of objects is already in $self->{args}
169 # TODO: make independent of the terminology "object..."
170 $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
171
172 # apply filter
173 if (my $filter = $self->{meta}->{source}->{filter}) {
174 #print Dumper($filter);
175 #exit;
176 $results ||= $self->_getNodeList('source', $filter);
177 }
178
179 # get reference to node list from convenient method provided by CORE-HANDLE
180 $results ||= $self->_getNodeList('source');
181
182 # checkpoint: do we actually have a list to iterate through?
183 if (!$results || !@{$results}) {
184 $logger->notice( __PACKAGE__ . "->_run: No nodes to synchronize." );
185 return;
186 }
187
188
189
190 # check if we actually *have* a synchronization method
191 if (!$self->{options}->{metadata}->{syncMethod}) {
192 $logger->critical( __PACKAGE__ . "->_run: No synchronization method (checksum|timestamp) specified" );
193 return;
194 }
195
196
197 # dereference
198 my @results = @{$results};
199
200 # iterate through set
201 foreach my $source_node_real (@results) {
202
203 print ":" if $self->{verbose};
204
205 $tc->{total}++;
206
207 #print "======================== iter", "\n";
208
209 # clone object (in case we have to modify it here)
210 # TODO:
211 # - is a "deep_copy" needed here if occouring modifications take place?
212 # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
213 # - after all, just take care for now that this object doesn't get updated!
214 # - so, just use its reference for now - if some cloning is needed in future, do this here!
215 my $source_node = $source_node_real;
216
217 # modify entry - handle new style callbacks (the readers)
218
219 # trace
220 #print Dumper($source_node);
221 #exit;
222
223 my $descent = 'source';
224
225 # handle callbacks right now while scanning them (asymmetric to the writers)
226 my $map_callbacks = {};
227 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
228
229 # trace
230 #print Dumper($callbacks);
231 #exit;
232
233 my $error = 0;
234
235 foreach my $node (keys %{$callbacks->{read}}) {
236
237 my $object = $source_node;
238 my $value; # = $source_node->{$node};
239
240 # trace
241 #print Dumper($self->{options});
242
243 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
244 #my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
245 my $perl_callback = $self->{meta}->{$descent}->{nodeType} . '::' . $node . '_read';
246 my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
247 #print $evalstring, "\n"; exit;
248 my $cb_result = eval($evalstring);
249 if ($@) {
250 $error = 1;
251 $logger->error( __PACKAGE__ . "->_run: $@" );
252 next;
253 }
254 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
255
256 $source_node->{$node} = $cb_result;
257
258 }
259
260 }
261
262 # trace
263 #print Dumper($source_node);
264
265 # exclude defined fields (simply delete from object)
266 map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
267
268 # here we accumulate information about the status of the current node (payload/row/object/item/entry)
269 $self->{node} = {};
270 $self->{node}->{source}->{payload} = $source_node;
271
272 # trace
273 #print Dumper($self->{node});
274 #exit;
275
276 # determine ident of entry
277 my $identOK = $self->_resolveNodeIdent('source');
278 #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
279 if (!$identOK) {
280 #print Dumper($self->{meta}->{source});
281 $logger->critical( __PACKAGE__ . "->_run: No ident found in source node ( nodeName='$self->{meta}->{source}->{nodeName}', nodeType='$self->{meta}->{source}->{nodeType}') try to \"prepare\" this node first?" );
282 return;
283 }
284
285 #print "l" if $self->{verbose};
286 my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
287
288 # mark node as new either if there's no ident or if stat/load failed
289 if (!$statOK) {
290 $self->{node}->{status}->{new} = 1;
291 print "n" if $self->{verbose};
292 }
293
294 #print "checksum", "\n";
295
296 #print Dumper($self);
297
298 # determine status of entry by synchronization method
299 if ( lc $self->{options}->{metadata}->{syncMethod} eq 'checksum' ) {
300 #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
301 #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
302
303 # calculate checksum of source node
304 #$self->_calcChecksum('source');
305 if (!$self->_readChecksum('source')) {
306 $logger->warning( __PACKAGE__ . "->_run: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
307 $tc->{skip}++;
308 print "s" if $self->{verbose};
309 next;
310 }
311
312 # get checksum from synchronization target
313 $self->_readChecksum('target');
314 #if (!$self->_readChecksum('target')) {
315 # $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
316 # next;
317 #}
318
319 # pre flight check: do we actually have a checksum provided?
320 #if (!$self->{node}->{source}->{checksum}) {
321 # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
322 # return;
323 #}
324
325 # determine if entry is "new" or "dirty"
326 # after all, this seems to be the point where the hammer falls.....
327 print "c" if $self->{verbose};
328
329 # trace
330 #print Dumper($self->{node});
331 #exit;
332
333 $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
334 if (!$self->{node}->{status}->{new}) {
335 $self->{node}->{status}->{dirty} =
336 $self->{node}->{status}->{new} ||
337 (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
338 ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
339 $self->{args}->{force};
340 }
341
342 } else {
343 $logger->warning( __PACKAGE__ . "->_run: Synchronization method '$self->{options}->{metadata}->{syncMethod}' is not implemented" );
344 $tc->{skip}++;
345 print "s" if $self->{verbose};
346 next;
347 }
348
349 # first reaction on entry-status: continue with next entry if the current is already "in sync"
350 if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
351 $tc->{in_sync}++;
352 next;
353 }
354
355 # build map to actually transfer the data from source to target
356 if (!$self->buildAttributeMap()) {
357 #$logger->warning( __PACKAGE__ . "->_run: Attribute Map could not be created. Will not insert or modify node.");
358 $tc->{skip}++;
359 print "e" if $self->{verbose};
360 next;
361 }
362
363 # trace
364 #print Dumper($self->{node}); exit;
365 #print "attempt", "\n";
366
367 # additional (new) checks for feature "write-protection"
368 if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
369 $tc->{attempt_transfer}++;
370 print "\n" if $self->{verbose};
371 $logger->notice( __PACKAGE__ . "->_run: Target is write-protected. Will not insert or modify node. " .
372 "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
373 print "\n" if $self->{verbose};
374 $tc->{skip}++;
375 next;
376 }
377
378 # trace
379 #print Dumper($self);
380 #exit;
381
382 # transfer contents of map to target
383 if ($self->{node}->{status}->{new}) {
384 $tc->{attempt_new}++;
385 $self->_doTransferToTarget('insert');
386 # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
387 #print Dumper($self->{node});
388 $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
389 $self->_readChecksum('target');
390
391 } elsif ($self->{node}->{status}->{dirty}) {
392 $tc->{attempt_modify}++;
393 # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
394 $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
395 $self->_doTransferToTarget('update');
396 $self->_readChecksum('target');
397 }
398
399 if ($self->{node}->{status}->{ok}) {
400 $tc->{ok}++;
401 print "t" if $self->{verbose};
402 }
403
404 if ($self->{node}->{status}->{error}) {
405 $tc->{error}++;
406 push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
407 print "e" if $self->{verbose};
408 }
409
410 # trace
411 #print Dumper($self);
412 #exit;
413
414 # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
415 # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
416 #if ($self->{node}->{status}->{ok} && $self->{options}->{target}->{storage}->{idAuthority}) {
417 if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{isIdentAuthority}) {
418 print "r" if $self->{verbose};
419 #print Dumper($self->{meta});
420 #print Dumper($self->{node});
421 #exit;
422 $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
423 }
424
425 }
426
427 print "\n" if $self->{verbose};
428
429 # build user-message from some stats
430 my $msg = "statistics: $tc";
431
432 if ($tc->{error_per_row}) {
433 $msg .= "\n";
434 $msg .= "errors from \"error_per_row\":" . "\n";
435 $msg .= Dumper($tc->{error_per_row});
436 }
437
438 # todo!!!
439 #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
440 #$logger->info( __PACKAGE__ . "->_run: $msg" );
441 $logger->info($msg . "\n");
442
443 return $tc;
444
445 }
446
447
448 # refactor this as some core-function to do a generic dump resolving data-encapsulations of e.g. Set::Object
449 sub _dumpCompact {
450 my $self = shift;
451
452 #my $vars = \@_;
453 my @data = ();
454
455 my $count = 0;
456 foreach (@_) {
457 my $item = {};
458 foreach my $key (keys %$_) {
459 my $val = $_->{$key};
460
461 #print Dumper($val);
462
463 if (ref $val eq 'Set::Object') {
464 #print "========================= SET", "\n";
465 #print Dumper($val);
466 #print Dumper($val->members());
467 #$val = $val->members();
468 #$vars->[$count]->{$key} = $val->members() if $val->can("members");
469 #$item->{$key} = $val->members() if $val->can("members");
470 $item->{$key} = $val->members();
471 #print Dumper($vars->[$count]->{$key});
472
473 } else {
474 $item->{$key} = $val;
475 }
476
477 }
478 push @data, $item;
479 $count++;
480 }
481
482 #print "Dump:", Dumper(@data), "\n";
483
484 $Data::Dumper::Indent = 0;
485 my $result = Dumper(@data);
486 $Data::Dumper::Indent = 2;
487 return $result;
488
489 }
490
491
492
493 sub _doTransferToTarget {
494 my $self = shift;
495 my $action = shift;
496
497 # trace
498 #print Dumper($self->{meta});
499 #print Dumper($self->{node});
500 #exit;
501
502 $self->_modifyNode('target', $action, $self->{node}->{map});
503 }
504
505
506 sub _doModifySource_IdentChecksum {
507 my $self = shift;
508 my $ident_new = shift;
509 # this changes an old node to a new one including ident and checksum
510 # TODO:
511 # - eventually introduce an external resource to store this data to
512 # - we won't have to "re"-modify the source node here
513 my $map = {
514 $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
515 cs => $self->{node}->{target}->{checksum},
516 };
517
518 #print Dumper($map);
519 #print Dumper($self->{node});
520 #exit;
521
522 $self->_modifyNode('source', 'update', $map);
523 }
524
525
526
527 sub _prepareNode_MetaProperties {
528 my $self = shift;
529 my $descent = shift;
530
531 $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
532
533 # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
534 my $list = $self->_getNodeList($descent);
535
536 # get first node
537 my $firstnode = $list->[0];
538
539 # check if node contains meta properties/nodes
540 # TODO: "cs" is hardcoded here!
541 my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
542 my @found = keys %$firstnode;
543 #my @diff = getDifference(\@found, \@required);
544 my $diff = getDifference(\@required, \@found);
545 #print Dumper(@found);
546 #print Dumper(@required);
547 #print Dumper(@diff);
548 #if (!$#diff || $#diff == -1) {
549 if (isEmpty($diff)) {
550 $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
551 foreach (@required) {
552 my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
553 #print "sql: $sql", "\n";
554 my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
555 #print Dumper($res->getStatus());
556 }
557 }
558
559 }
560
561 sub _prepareNode_DummyIdent {
562 my $self = shift;
563 my $descent = shift;
564
565 $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
566
567 my $list = $self->_getNodeList($descent);
568 #print Dumper($list);
569 my $i = 0;
570 my $ident_base = 5678983;
571 my $ident_appendix = '0001';
572 foreach my $node (@$list) {
573 my $ident_dummy = $i + $ident_base;
574 $ident_dummy .= $ident_appendix;
575 my $map = {
576 $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
577 cs => undef,
578 };
579
580 # diff lists and ...
581 my $diff = getDifference([keys %$node], [keys %$map]);
582 next if $#{$diff} == -1;
583
584 # ... build criteria including all columns
585 my @crits;
586 foreach my $property (@$diff) {
587 next if !$property;
588 my $value = $node->{$property};
589 next if !$value;
590 push @crits, "$property='" . quotesql($value) . "'";
591 }
592 my $crit = join ' AND ', @crits;
593 print "p" if $self->{verbose};
594
595 #print Dumper($map);
596 #print Dumper($crit);
597
598 $self->_modifyNode($descent, 'update', $map, $crit);
599 $i++;
600 }
601
602 #print "\n" if $self->{verbose};
603
604 if (!$i) {
605 $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
606 }
607
608 }
609
610 # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
611 sub _otherSide {
612 my $self = shift;
613 my $descent = shift;
614 return 'source' if $descent eq 'target';
615 return 'target' if $descent eq 'source';
616 return '';
617 }
618
619
620 1;
621 __END__

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed