/[cvs]/nfo/perl/libs/Data/Transfer/Sync/Core.pm
ViewVC logotype

Contents of /nfo/perl/libs/Data/Transfer/Sync/Core.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.8 - (show annotations)
Thu Mar 27 15:31:15 2003 UTC (21 years, 3 months ago) by joko
Branch: MAIN
Changes since 1.7: +6 -2 lines
fixes to modules regarding new namespace(s) below Data::Mungle::*

1 ## $Id: Core.pm,v 1.7 2003/02/21 08:01:11 joko Exp $
2 ##
3 ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
4 ##
5 ## See COPYRIGHT section in pod text below for usage and distribution rights.
6 ##
7 ## ----------------------------------------------------------------------------------------
8 ## $Log: Core.pm,v $
9 ## Revision 1.7 2003/02/21 08:01:11 joko
10 ## debugging, logging
11 ## renamed module
12 ##
13 ## Revision 1.6 2003/02/14 14:03:49 joko
14 ## + logging, debugging
15 ## - refactored code to sister module
16 ##
17 ## Revision 1.5 2003/02/11 05:30:47 joko
18 ## + minor fixes and some debugging mud
19 ##
20 ## Revision 1.4 2003/02/09 05:01:10 joko
21 ## + major structure changes
22 ## - refactored code to sister modules
23 ##
24 ## Revision 1.3 2003/01/20 17:01:14 joko
25 ## + cosmetics and debugging
26 ## + probably refactored code to new plugin-modules 'Metadata.pm' and/or 'StorageInterface.pm' (guess it was the last one...)
27 ##
28 ## Revision 1.2 2003/01/19 02:05:42 joko
29 ## - removed pod-documentation: now in Data/Transfer/Sync.pod
30 ##
31 ## Revision 1.1 2003/01/19 01:23:04 joko
32 ## + new from Data/Transfer/Sync.pm
33 ##
34 ## Revision 1.11 2002/12/23 07:10:59 joko
35 ## + using MD5 for checksum generation again - the 32-bit integer hash from DBI seems to be too lazy
36 ##
37 ## Revision 1.10 2002/12/19 01:07:16 joko
38 ## + fixed output done via $logger
39 ##
40 ## Revision 1.9 2002/12/16 07:02:34 jonen
41 ## + added comment
42 ##
43 ## Revision 1.8 2002/12/15 02:03:09 joko
44 ## + fixed logging-messages
45 ## + additional metadata-checks
46 ##
47 ## Revision 1.7 2002/12/13 21:49:34 joko
48 ## + sub configure
49 ## + sub checkOptions
50 ##
51 ## Revision 1.6 2002/12/06 04:49:10 jonen
52 ## + disabled output-puffer here
53 ##
54 ## Revision 1.5 2002/12/05 08:06:05 joko
55 ## + bugfix with determining empty fields (Null) with DBD::CSV
56 ## + debugging
57 ## + updated comments
58 ##
59 ## Revision 1.4 2002/12/03 15:54:07 joko
60 ## + {import}-flag is now {prepare}-flag
61 ##
62 ## Revision 1.3 2002/12/01 22:26:59 joko
63 ## + minor cosmetics for logging
64 ##
65 ## Revision 1.2 2002/12/01 04:43:25 joko
66 ## + mapping deatil entries may now be either an ARRAY or a HASH
67 ## + erase flag is used now (for export-operations)
68 ## + expressions to refer to values inside deep nested structures
69 ## - removed old mappingV2-code
70 ## + cosmetics
71 ## + sub _erase_all
72 ##
73 ## Revision 1.1 2002/11/29 04:45:50 joko
74 ## + initial check in
75 ##
76 ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
77 ## + new
78 ## ----------------------------------------------------------------------------------------
79
80
81 package Data::Transfer::Sync::Core;
82
83 use strict;
84 use warnings;
85
86 use mixin::with qw( Data::Transfer::Sync );
87
88
89 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - main
90
91 use Data::Dumper;
92
93 #use misc::HashExt;
94 use Hash::Serializer;
95 use Data::Mungle::Compare::Struct qw( getDifference isEmpty );
96 use Data::Storage::Container;
97 use DesignPattern::Object;
98 use libdb qw( quotesql );
99
100 # get logger instance
101 my $logger = Log::Dispatch::Config->instance;
102
103 $| = 1;
104
105 =pod
106 sub new {
107 my $invocant = shift;
108 my $class = ref($invocant) || $invocant;
109 my $self = {};
110 $logger->debug( __PACKAGE__ . "->new(@_)" );
111 bless $self, $class;
112 $self->configure(@_);
113 return $self;
114 }
115 =cut
116
117
118 sub _init {
119 my $self = shift;
120
121 # build new container if necessary
122 $self->{container} = Data::Storage::Container->new() if !$self->{container};
123
124 # add storages to container (optional)
125 foreach (keys %{$self->{storages}}) {
126 $self->{container}->addStorage($_, $self->{storages}->{$_});
127 }
128
129 # trace
130 #print Dumper($self);
131 #exit;
132
133 return 1;
134
135 }
136
137 sub _initV1 {
138 my $self = shift;
139 $logger->debug( __PACKAGE__ . "->_initV1" );
140 die("this should not be reached!");
141 # tag storages with id-authority and checksum-provider information
142 # TODO: better store tag inside metadata to hold bits together!
143 map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
144 map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
145 map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
146 }
147
148
149 # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="ยง/%???
150 sub _run {
151
152 my $self = shift;
153
154 #print "verbose: ", $self->{verbose}, "\n";
155 $self->{verbose} = 1;
156
157 $logger->debug( __PACKAGE__ . "->_run" );
158
159 # for statistics
160 my $tc = Hash::Serializer->new( {} );
161 my $results;
162
163 # set of objects is already in $self->{args}
164 # TODO: make independent of the terminology "object..."
165 $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
166
167 # apply filter
168 if (my $filter = $self->{meta}->{source}->{filter}) {
169 #print Dumper($filter);
170 #exit;
171 $results ||= $self->_getNodeList('source', $filter);
172 }
173
174 # get reference to node list from convenient method provided by CORE-HANDLE
175 $results ||= $self->_getNodeList('source');
176
177 # checkpoint: do we actually have a list to iterate through?
178 if (!$results || !@{$results}) {
179 $logger->notice( __PACKAGE__ . "->_run: No nodes to synchronize." );
180 return;
181 }
182
183
184
185 # check if we actually *have* a synchronization method
186 if (!$self->{options}->{metadata}->{syncMethod}) {
187 $logger->critical( __PACKAGE__ . "->_run: No synchronization method (checksum|timestamp) specified" );
188 return;
189 }
190
191
192 # dereference
193 my @results = @{$results};
194
195 # iterate through set
196 foreach my $source_node_real (@results) {
197
198 print ":" if $self->{verbose};
199
200 $tc->{total}++;
201
202 #print "======================== iter", "\n";
203
204 # clone object (in case we have to modify it here)
205 # TODO:
206 # - is a "deep_copy" needed here if occouring modifications take place?
207 # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
208 # - after all, just take care for now that this object doesn't get updated!
209 # - so, just use its reference for now - if some cloning is needed in future, do this here!
210 my $source_node = $source_node_real;
211
212 # modify entry - handle new style callbacks (the readers)
213
214 # trace
215 #print Dumper($source_node);
216 #exit;
217
218 my $descent = 'source';
219
220 # handle callbacks right now while scanning them (asymmetric to the writers)
221 my $map_callbacks = {};
222 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
223
224 # trace
225 #print Dumper($callbacks);
226 #exit;
227
228 my $error = 0;
229
230 foreach my $node (keys %{$callbacks->{read}}) {
231
232 my $object = $source_node;
233 my $value; # = $source_node->{$node};
234
235 # trace
236 #print Dumper($self->{options});
237
238 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
239 #my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
240 my $perl_callback = $self->{meta}->{$descent}->{nodeType} . '::' . $node . '_read';
241 my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
242 #print $evalstring, "\n"; exit;
243 my $cb_result = eval($evalstring);
244 if ($@) {
245 $error = 1;
246 $logger->error( __PACKAGE__ . "->_run: $@" );
247 next;
248 }
249 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
250
251 $source_node->{$node} = $cb_result;
252
253 }
254
255 }
256
257 # trace
258 #print Dumper($source_node);
259
260 # exclude defined fields (simply delete from object)
261 map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
262
263 # here we accumulate information about the status of the current node (payload/row/object/item/entry)
264 $self->{node} = {};
265 $self->{node}->{source}->{payload} = $source_node;
266
267 # trace
268 #print Dumper($self->{node});
269 #exit;
270
271 # determine ident of entry
272 my $identOK = $self->_resolveNodeIdent('source');
273 #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
274 if (!$identOK) {
275 #print Dumper($self->{meta}->{source});
276 $logger->critical( __PACKAGE__ . "->_run: No ident found in source node ( nodeName='$self->{meta}->{source}->{nodeName}', nodeType='$self->{meta}->{source}->{nodeType}') try to \"prepare\" this node first?" );
277 return;
278 }
279
280 #print "l" if $self->{verbose};
281 my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
282
283 # mark node as new either if there's no ident or if stat/load failed
284 if (!$statOK) {
285 $self->{node}->{status}->{new} = 1;
286 print "n" if $self->{verbose};
287 }
288
289 #print "checksum", "\n";
290
291 #print Dumper($self);
292
293 # determine status of entry by synchronization method
294 if ( lc $self->{options}->{metadata}->{syncMethod} eq 'checksum' ) {
295 #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
296 #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
297
298 # calculate checksum of source node
299 #$self->_calcChecksum('source');
300 if (!$self->_readChecksum('source')) {
301 $logger->warning( __PACKAGE__ . "->_run: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
302 $tc->{skip}++;
303 print "s" if $self->{verbose};
304 next;
305 }
306
307 # get checksum from synchronization target
308 $self->_readChecksum('target');
309 #if (!$self->_readChecksum('target')) {
310 # $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
311 # next;
312 #}
313
314 # pre flight check: do we actually have a checksum provided?
315 #if (!$self->{node}->{source}->{checksum}) {
316 # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
317 # return;
318 #}
319
320 # determine if entry is "new" or "dirty"
321 # after all, this seems to be the point where the hammer falls.....
322 print "c" if $self->{verbose};
323
324 # trace
325 #print Dumper($self->{node});
326 #exit;
327
328 $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
329 if (!$self->{node}->{status}->{new}) {
330 $self->{node}->{status}->{dirty} =
331 $self->{node}->{status}->{new} ||
332 (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
333 ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
334 $self->{args}->{force};
335 }
336
337 } else {
338 $logger->warning( __PACKAGE__ . "->_run: Synchronization method '$self->{options}->{metadata}->{syncMethod}' is not implemented" );
339 $tc->{skip}++;
340 print "s" if $self->{verbose};
341 next;
342
343 }
344
345 # first reaction on entry-status: continue with next entry if the current is already "in sync"
346 if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
347 $tc->{in_sync}++;
348 next;
349 }
350
351 # build map to actually transfer the data from source to target
352 $self->buildAttributeMap();
353
354
355 # trace
356 #print Dumper($self->{node}); exit;
357 #print "attempt", "\n";
358
359 # additional (new) checks for feature "write-protection"
360 if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
361 $tc->{attempt_transfer}++;
362 print "\n" if $self->{verbose};
363 $logger->notice( __PACKAGE__ . "->_run: Target is write-protected. Will not insert or modify node. " .
364 "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
365 print "\n" if $self->{verbose};
366 $tc->{skip}++;
367 next;
368 }
369
370 # trace
371 #print Dumper($self);
372 #exit;
373
374 # transfer contents of map to target
375 if ($self->{node}->{status}->{new}) {
376 $tc->{attempt_new}++;
377 $self->_doTransferToTarget('insert');
378 # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
379 #print Dumper($self->{node});
380 $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
381 $self->_readChecksum('target');
382
383 } elsif ($self->{node}->{status}->{dirty}) {
384 $tc->{attempt_modify}++;
385 # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
386 $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
387 $self->_doTransferToTarget('update');
388 $self->_readChecksum('target');
389 }
390
391 if ($self->{node}->{status}->{ok}) {
392 $tc->{ok}++;
393 print "t" if $self->{verbose};
394 }
395
396 if ($self->{node}->{status}->{error}) {
397 $tc->{error}++;
398 push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
399 print "e" if $self->{verbose};
400 }
401
402 # trace
403 #print Dumper($self);
404 #exit;
405
406 # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
407 # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
408 #if ($self->{node}->{status}->{ok} && $self->{options}->{target}->{storage}->{idAuthority}) {
409 if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{isIdentAuthority}) {
410 print "r" if $self->{verbose};
411 #print Dumper($self->{meta});
412 #print Dumper($self->{node});
413 #exit;
414 $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
415 }
416
417 }
418
419 print "\n" if $self->{verbose};
420
421 # build user-message from some stats
422 my $msg = "statistics: $tc";
423
424 if ($tc->{error_per_row}) {
425 $msg .= "\n";
426 $msg .= "errors from \"error_per_row\":" . "\n";
427 $msg .= Dumper($tc->{error_per_row});
428 }
429
430 # todo!!!
431 #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
432 #$logger->info( __PACKAGE__ . "->_run: $msg" );
433 $logger->info($msg . "\n");
434
435 return $tc;
436
437 }
438
439
440 # refactor this as some core-function to do a generic dump resolving data-encapsulations of e.g. Set::Object
441 sub _dumpCompact {
442 my $self = shift;
443
444 #my $vars = \@_;
445 my @data = ();
446
447 my $count = 0;
448 foreach (@_) {
449 my $item = {};
450 foreach my $key (keys %$_) {
451 my $val = $_->{$key};
452
453 #print Dumper($val);
454
455 if (ref $val eq 'Set::Object') {
456 #print "========================= SET", "\n";
457 #print Dumper($val);
458 #print Dumper($val->members());
459 #$val = $val->members();
460 #$vars->[$count]->{$key} = $val->members() if $val->can("members");
461 #$item->{$key} = $val->members() if $val->can("members");
462 $item->{$key} = $val->members();
463 #print Dumper($vars->[$count]->{$key});
464
465 } else {
466 $item->{$key} = $val;
467 }
468
469 }
470 push @data, $item;
471 $count++;
472 }
473
474 #print "Dump:", Dumper(@data), "\n";
475
476 $Data::Dumper::Indent = 0;
477 my $result = Dumper(@data);
478 $Data::Dumper::Indent = 2;
479 return $result;
480
481 }
482
483
484
485 sub _doTransferToTarget {
486 my $self = shift;
487 my $action = shift;
488
489 # trace
490 #print Dumper($self->{meta});
491 #print Dumper($self->{node});
492 #exit;
493
494 $self->_modifyNode('target', $action, $self->{node}->{map});
495 }
496
497
498 sub _doModifySource_IdentChecksum {
499 my $self = shift;
500 my $ident_new = shift;
501 # this changes an old node to a new one including ident and checksum
502 # TODO:
503 # - eventually introduce an external resource to store this data to
504 # - we won't have to "re"-modify the source node here
505 my $map = {
506 $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
507 cs => $self->{node}->{target}->{checksum},
508 };
509
510 #print Dumper($map);
511 #print Dumper($self->{node});
512 #exit;
513
514 $self->_modifyNode('source', 'update', $map);
515 }
516
517
518
519 sub _prepareNode_MetaProperties {
520 my $self = shift;
521 my $descent = shift;
522
523 $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
524
525 # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
526 my $list = $self->_getNodeList($descent);
527
528 # get first node
529 my $firstnode = $list->[0];
530
531 # check if node contains meta properties/nodes
532 # TODO: "cs" is hardcoded here!
533 my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
534 my @found = keys %$firstnode;
535 #my @diff = getDifference(\@found, \@required);
536 my $diff = getDifference(\@required, \@found);
537 #print Dumper(@found);
538 #print Dumper(@required);
539 #print Dumper(@diff);
540 #if (!$#diff || $#diff == -1) {
541 if (isEmpty($diff)) {
542 $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
543 foreach (@required) {
544 my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
545 #print "sql: $sql", "\n";
546 my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
547 #print Dumper($res->getStatus());
548 }
549 }
550
551 }
552
553 sub _prepareNode_DummyIdent {
554 my $self = shift;
555 my $descent = shift;
556
557 $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
558
559 my $list = $self->_getNodeList($descent);
560 #print Dumper($list);
561 my $i = 0;
562 my $ident_base = 5678983;
563 my $ident_appendix = '0001';
564 foreach my $node (@$list) {
565 my $ident_dummy = $i + $ident_base;
566 $ident_dummy .= $ident_appendix;
567 my $map = {
568 $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
569 cs => undef,
570 };
571
572 # diff lists and ...
573 my $diff = getDifference([keys %$node], [keys %$map]);
574 next if $#{$diff} == -1;
575
576 # ... build criteria including all columns
577 my @crits;
578 foreach my $property (@$diff) {
579 next if !$property;
580 my $value = $node->{$property};
581 next if !$value;
582 push @crits, "$property='" . quotesql($value) . "'";
583 }
584 my $crit = join ' AND ', @crits;
585 print "p" if $self->{verbose};
586
587 #print Dumper($map);
588 #print Dumper($crit);
589
590 $self->_modifyNode($descent, 'update', $map, $crit);
591 $i++;
592 }
593
594 #print "\n" if $self->{verbose};
595
596 if (!$i) {
597 $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
598 }
599
600 }
601
602 # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
603 sub _otherSide {
604 my $self = shift;
605 my $descent = shift;
606 return 'source' if $descent eq 'target';
607 return 'target' if $descent eq 'source';
608 return '';
609 }
610
611
612 1;
613 __END__

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed