/[cvs]/nfo/perl/libs/Data/Transfer/Sync.pm
ViewVC logotype

Contents of /nfo/perl/libs/Data/Transfer/Sync.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (show annotations)
Fri Nov 29 04:45:50 2002 UTC (21 years, 7 months ago) by joko
Branch: MAIN
+ initial check in

1 ## $Id$
2 ##
3 ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
4 ##
5 ## See COPYRIGHT section in pod text below for usage and distribution rights.
6 ##
7 ## ----------------------------------------------------------------------------------------
8 ## $Log$
9 ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
10 ## + new
11 ## ----------------------------------------------------------------------------------------
12
13
14 package Data::Transfer::Sync;
15
16 use strict;
17 use warnings;
18
19 use Data::Dumper;
20 use misc::HashExt;
21 use libp qw( md5_base64 );
22 use libdb qw( quotesql hash2Sql );
23 use Data::Transform::OO qw( hash2object );
24 use Data::Compare::Struct qw( getDifference isEmpty );
25
26 # get logger instance
27 my $logger = Log::Dispatch::Config->instance;
28
29
30 sub new {
31 my $invocant = shift;
32 my $class = ref($invocant) || $invocant;
33 my $self = { @_ };
34 $logger->debug( __PACKAGE__ . "->new(@_)" );
35 bless $self, $class;
36 $self->_init();
37 return $self;
38 }
39
40
41 sub _init {
42 my $self = shift;
43
44 # build new container if necessary
45 $self->{container} = Data::Storage::Container->new() if !$self->{container};
46
47 # add storages to container (optional)
48 foreach (keys %{$self->{storages}}) {
49 $self->{container}->addStorage($_, $self->{storages}->{$_});
50 }
51
52 # tag storages with id-authority and checksum-provider information
53 # TODO: better store tag inside metadata to hold bits together!
54 map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
55 map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
56 map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
57
58 }
59
60
61 # TODO: some feature to show off the progress of synchronization (cur/max * 100)
62 sub syncNodes {
63
64 my $self = shift;
65 my $args = shift;
66
67 # remember arguments through the whole processing
68 $self->{args} = $args;
69
70 $logger->debug( __PACKAGE__ . "->syncNodes: starting" );
71
72 # hash to hold and/or fill in metadata required for the processing
73 $self->{meta} = {};
74
75 # hash to sum up results
76 my $direction_arrow = '';
77
78 # detect synchronization method to determine which optical symbol (directed arrow) to use
79 if (lc $self->{args}->{direction} eq 'push') {
80 $direction_arrow = '->';
81 } elsif (lc $self->{args}->{direction} eq 'pull') {
82 $direction_arrow = '<-';
83 } elsif (lc $self->{args}->{direction} eq 'full') {
84 $direction_arrow = '<->';
85 } else {
86 }
87
88 # decompose identifiers for each partner
89 # TODO: take this list from already established/given metadata
90 foreach ('source', 'target') {
91
92 # get/set metadata for further processing
93
94 # Partner and Node (e.g.: "L:Country" or "R:countries.csv")
95 if (my $item = $self->{args}->{$_}) {
96 my @item = split(':', $item);
97 $self->{meta}->{$_}->{dbkey} = $item[0];
98 $self->{meta}->{$_}->{node} = $item[1];
99 }
100
101 # Filter
102 if (my $item_filter = $self->{args}->{$_ . '_filter'}) {
103 $self->{meta}->{$_}->{filter} = $item_filter;
104 }
105
106 # IdentProvider
107 if (my $item_ident = $self->{args}->{$_ . '_ident'}) {
108 my @item_ident = split(':', $item_ident);
109 $self->{meta}->{$_}->{IdentProvider} = { method => $item_ident[0], arg => $item_ident[1] };
110 }
111
112 # TODO: ChecksumProvider
113
114 # exclude properties/subnodes
115 if (my $item_exclude = $self->{args}->{$_ . '_exclude'}) {
116 $self->{meta}->{$_}->{subnodes_exclude} = $item_exclude;
117 }
118
119 # TypeProvider
120 if (my $item_type = $self->{args}->{$_ . '_type'}) {
121 my @item_type = split(':', $item_type);
122 $self->{meta}->{$_}->{TypeProvider} = { method => $item_type[0], arg => $item_type[1] };
123 }
124
125 # Callbacks - writers (will be triggered _before_ writing to target)
126 if (my $item_writers = $self->{args}->{$_ . '_callbacks_write'}) {
127 my $descent = $_; # this is important since the following code inside the map wants to use its own context variables
128 map { $self->{meta}->{$descent}->{Callback}->{write}->{$_}++; } @$item_writers;
129 }
130
131 # Callbacks - readers (will be triggered _after_ reading from source)
132 if (my $item_readers = $self->{args}->{$_ . '_callbacks_read'}) {
133 my $descent = $_;
134 map { $self->{meta}->{$descent}->{Callback}->{read}->{$_}++; } @$item_readers;
135 }
136
137 # resolve storage objects
138 #$self->{$_} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
139 # relink references to metainfo
140 $self->{meta}->{$_}->{storage} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
141 #print "iiiiisprov: ", Dumper($self->{meta}->{$_}->{storage}), "\n";
142 }
143
144 $logger->info( __PACKAGE__ . "->syncNodes: source=$self->{meta}->{source}->{dbkey}/$self->{meta}->{source}->{node} $direction_arrow target=$self->{meta}->{target}->{dbkey}/$self->{meta}->{target}->{node}" );
145
146 # build mapping
147 foreach (@{$self->{args}->{mapping}}) {
148 my @key1 = split(':', $_->[0]);
149 my @key2 = split(':', $_->[1]);
150 push @{$self->{meta}->{$key1[0]}->{childnodes}}, $key1[1];
151 push @{$self->{meta}->{$key2[0]}->{childnodes}}, $key2[1];
152 }
153
154 # check partners/nodes: does partner exist / is node available?
155 foreach my $partner (keys %{$self->{meta}}) {
156 next if $self->{meta}->{$partner}->{storage}->{locator}->{type} eq 'DBI'; # for DBD::CSV - re-enable for others
157 my $node = $self->{meta}->{$partner}->{node};
158 if (!$self->{meta}->{$partner}->{storage}->existsChildNode($node)) {
159 $logger->critical( __PACKAGE__ . "->syncNodes: Could not reach \"$node\" at \"$partner\"." );
160 return;
161 }
162 }
163
164 # TODO:
165 # + if action == PUSH: start processing
166 # -+ if action == PULL: swap metadata and start processing
167 # - if action == FULL: start processing, then swap metadata and (re-)start processing
168
169 #print Dumper($self->{args});
170
171 # manipulate metainfo according to direction of synchronization
172 if (lc $self->{args}->{direction} eq 'push') {
173 # just do it ...
174 } elsif (lc $self->{args}->{direction} eq 'pull') {
175 #print "=======SWAP", "\n";
176 # swap
177 ($self->{meta}->{source}, $self->{meta}->{target}) =
178 ($self->{meta}->{target}, $self->{meta}->{source});
179 } elsif (lc $self->{args}->{direction} eq 'full') {
180 } else {
181 }
182
183 # import flag means: prepare the source node to be syncable
184 # this is useful if there are e.g. no "ident" or "checksum" columns yet inside a DBI like (row-based) storage
185 if ($self->{args}->{import}) {
186 $self->_prepareNode_MetaProperties('source');
187 $self->_prepareNode_DummyIdent('source');
188 #return;
189 #$self->_erase_all($opts->{source_node});
190 }
191
192 $self->_syncNodes();
193
194 }
195
196
197 # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="§/%???
198 sub _syncNodes {
199
200 my $self = shift;
201
202 my $tc = OneLineDumpHash->new( {} );
203 my $results;
204
205 # set of objects is already in $self->{args}
206 # TODO: make independent of the terminology "object..."
207 $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
208
209 # apply filter
210 if (my $filter = $self->{meta}->{source}->{filter}) {
211 #print Dumper($filter);
212 #exit;
213 $results ||= $self->_getNodeList('source', $filter);
214 }
215
216 # get reference to node list from convenient method provided by corehandle
217 #$results ||= $self->{source}->getListUnfiltered($self->{meta}->{source}->{node});
218 #$results ||= $self->{meta}->{source}->{storage}->getListUnfiltered($self->{meta}->{source}->{node});
219 $results ||= $self->_getNodeList('source');
220
221 # checkpoint: do we actually have a list to iterate through?
222 if (!$results || !@{$results}) {
223 $logger->notice( __PACKAGE__ . "->syncNodes: No nodes to synchronize." );
224 return;
225 }
226
227 # dereference
228 my @results = @{$results};
229
230 # iterate through set
231 foreach my $source_node_real (@results) {
232
233 $tc->{total}++;
234
235 #print "======================== iter", "\n";
236
237 # clone object (in case we have to modify it here)
238 # TODO:
239 # - is a "deep_copy" needed here if occouring modifications take place?
240 # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
241 # - after all, just take care for now that this object doesn't get updated!
242 my $source_node = $source_node_real;
243
244 # modify entry - handle new style callbacks (the readers)
245 #print Dumper($source_node);
246 #exit;
247
248 my $descent = 'source';
249
250 # handle callbacks right now while scanning them (asymmetric to the writers)
251 my $map_callbacks = {};
252 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
253
254 my $error = 0;
255
256 foreach my $node (keys %{$callbacks->{read}}) {
257
258 my $object = $source_node;
259 my $value; # = $source_node->{$node};
260
261 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
262 my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
263 my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
264 #print $evalstring, "\n"; exit;
265 my $cb_result = eval($evalstring);
266 if ($@) {
267 die $@;
268 $error = 1;
269 print $@, "\n";
270 }
271 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
272
273 $source_node->{$node} = $cb_result;
274
275 }
276
277 }
278
279 #print Dumper($source_node);
280
281 # exclude defined fields (simply delete from object)
282 map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
283
284 # here we accumulate information about the status of the current node (payload/row/object/item/entry)
285 $self->{node} = {};
286 $self->{node}->{source}->{payload} = $source_node;
287
288 #print "res - ident", "\n";
289
290 # determine ident of entry
291 my $identOK = $self->_resolveNodeIdent('source');
292 #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
293 if (!$identOK) {
294 $logger->critical( __PACKAGE__ . "->syncNodes: Can not synchronize: No ident found in source node, maybe try to \"import\" this node first." );
295 return;
296 }
297
298 #print "statload", "\n";
299 #print "ident: ", $self->{node}->{source}->{ident}, "\n";
300
301 my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
302
303 # mark node as new either if there's no ident or if stat/load failed
304 if (!$statOK) {
305 $self->{node}->{status}->{new} = 1;
306 print "n" if $self->{verbose};
307 }
308
309 #print "checksum", "\n";
310
311 # determine status of entry by synchronization method
312 if ( (lc $self->{args}->{method} eq 'checksum') ) {
313 #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
314 #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
315
316 # TODO:
317 # is this really worth a "critical"???
318 # no - it should just be a debug appendix i believe
319
320 #print "readcs", "\n";
321
322 # calculate checksum of source node
323 #$self->_calcChecksum('source');
324 if (!$self->_readChecksum('source')) {
325 $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
326 $tc->{skip}++;
327 print "s" if $self->{verbose};
328 next;
329 }
330
331 # get checksum from synchronization target
332 $self->_readChecksum('target');
333 #if (!$self->_readChecksum('target')) {
334 # $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
335 # next;
336 #}
337
338 # pre flight check: do we actually have a checksum provided?
339 #if (!$self->{node}->{source}->{checksum}) {
340 # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
341 # return;
342 #}
343
344 # determine if entry is "new" or "dirty"
345 # after all, this seems to be the point where the hammer falls.....
346 print "c" if $self->{verbose};
347 $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
348 if (!$self->{node}->{status}->{new}) {
349 $self->{node}->{status}->{dirty} =
350 $self->{node}->{status}->{new} ||
351 (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
352 ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
353 $self->{args}->{force};
354 }
355
356 }
357
358 # first reaction on entry-status: continue with next entry if the current is already "in sync"
359 if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
360 $tc->{in_sync}++;
361 next;
362 }
363
364 # build map to actually transfer the data from source to target
365 $self->_buildMap();
366
367
368 #print Dumper($self->{node}); exit;
369
370 #print "attempt", "\n";
371
372 # additional (new) checks for feature "write-protection"
373 if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
374 $tc->{attempt_transfer}++;
375 print "\n" if $self->{verbose};
376 $logger->notice( __PACKAGE__ . "->syncNodes: Target is write-protected. Will not insert or modify node. " .
377 "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
378 print "\n" if $self->{verbose};
379 $tc->{skip}++;
380 next;
381 }
382
383 # transfer contents of map to target
384 if ($self->{node}->{status}->{new}) {
385 $tc->{attempt_new}++;
386 $self->_doTransferToTarget('insert');
387 # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
388 $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
389 $self->_readChecksum('target');
390
391 } elsif ($self->{node}->{status}->{dirty}) {
392 $tc->{attempt_modify}++;
393 # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
394 $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
395 $self->_doTransferToTarget('update');
396 $self->_readChecksum('target');
397 }
398
399 if ($self->{node}->{status}->{ok}) {
400 $tc->{ok}++;
401 print "t" if $self->{verbose};
402 }
403
404 if ($self->{node}->{status}->{error}) {
405 $tc->{error}++;
406 push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
407 print "e" if $self->{verbose};
408 }
409
410 # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
411 # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
412 if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{storage}->{isIdentAuthority}) {
413 #print Dumper($self->{meta});
414 #print Dumper($self->{node});
415 #exit;
416 $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
417 print "r" if $self->{verbose};
418 }
419
420 print ":" if $self->{verbose};
421
422 }
423
424 print "\n" if $self->{verbose};
425
426 # build user-message from some stats
427 my $msg = "stats: $tc";
428
429 if ($tc->{error_per_row}) {
430 $msg .= "\n";
431 $msg .= "errors:" . "\n";
432 $msg .= Dumper($tc->{error_per_row});
433 }
434
435 # todo!!!
436 #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
437 $logger->info( __PACKAGE__ . "->syncNodes: $msg" );
438
439 return $tc;
440
441 }
442
443
444 sub _dumpCompact {
445 my $self = shift;
446
447 #my $vars = \@_;
448 my @data = ();
449
450 my $count = 0;
451 foreach (@_) {
452 my $item = {};
453 foreach my $key (keys %$_) {
454 my $val = $_->{$key};
455 if (ref $val eq 'Set::Object') {
456 #print "========================= SET", "\n";
457 #print Dumper($val);
458 #print Dumper($val->members());
459 #$val = $val->members();
460 #$vars->[$count]->{$key} = $val->members() if $val->can("members");
461 #$item->{$key} = $val->members() if $val->can("members");
462 $item->{$key} = $val->members();
463 #print Dumper($vars->[$count]->{$key});
464 } else {
465 $item->{$key} = $val;
466 }
467 }
468 push @data, $item;
469 $count++;
470 }
471
472 #print "Dump:", "\n";
473 #print Dumper(@data);
474
475 $Data::Dumper::Indent = 0;
476 my $result = Dumper(@data);
477 $Data::Dumper::Indent = 2;
478 return $result;
479 }
480
481
482 sub _calcChecksum {
483
484 my $self = shift;
485 my $descent = shift;
486 my $specifier = shift;
487
488 # calculate checksum for current object
489 my $ident = $self->{node}->{$descent}->{ident};
490
491 # build dump of this node
492 my $payload = $self->{node}->{$descent}->{payload};
493 #my $dump = $ident . "\n" . $item->quickdump();
494 #my $dump = $ident . "\n" . Dumper($item);
495 my $dump = $ident . "\n" . $self->_dumpCompact($payload);
496
497 # TODO: $logger->dump( ... );
498 #$logger->debug( __PACKAGE__ . ": " . $dump );
499 #$logger->dump( __PACKAGE__ . ": " . $dump );
500
501 # calculate checksum from dump
502 # md5-based fingerprint, base64 encoded (from Digest::MD5)
503 #my $checksum_cur = md5_base64($objdump) . '==';
504 # 32-bit integer "hash" value (maybe faster?) (from DBI)
505 $self->{node}->{$descent}->{checksum} = DBI::hash($dump, 1);
506
507 # signal good
508 return 1;
509
510 }
511
512
513 sub _readChecksum {
514 my $self = shift;
515
516 my $descent = shift;
517
518 #print "getcheck:", "\n"; print Dumper($self->{node}->{$descent});
519
520 if (!$self->{node}->{$descent}) {
521 # signal checksum bad
522 return;
523 }
524
525 # get checksum for current entry
526 # TODO: don't have the checksum column/property hardcoded as "cs" here, make this configurable somehow
527
528 if ($self->{meta}->{$descent}->{storage}->{isChecksumAuthority}) {
529 #$self->{node}->{$descent}->{checksum} = $entry->{cs};
530 #$self->{node}->{$descent}->{checksum} = $self->_calcChecksum($descent); # $entry->{cs};
531 #print "descent: $descent", "\n";
532 $self->_calcChecksum($descent);
533 #print "checksum: ", $self->{node}->{$descent}->{checksum}, "\n";
534 } else {
535
536 #$self->{node}->{$descent}->{checksum} = $entry->{cs};
537 $self->{node}->{$descent}->{checksum} = $self->{node}->{$descent}->{payload}->{cs};
538 }
539
540 # signal checksum good
541 return 1;
542
543 }
544
545
546 sub _buildMap {
547
548 my $self = shift;
549
550 # field-structure for building sql
551 # mapping of sql-fieldnames to object-attributes
552 $self->{node}->{map} = {};
553
554 # manually set ...
555 # ... object-id
556 $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}} = $self->{node}->{source}->{ident};
557 # ... checksum
558 $self->{node}->{map}->{cs} = $self->{node}->{source}->{checksum};
559
560 #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
561
562 # for transferring flat structures via simple (1:1) mapping
563 # TODO: diff per property / property value
564
565 if ($self->{args}->{mapping}) {
566 # apply mapping from $self->{args}->{mapping} to $self->{node}->{map}
567 #foreach my $key (@{$self->{meta}->{source}->{childnodes}}) {
568 my @childnodes = @{$self->{meta}->{source}->{childnodes}};
569 for (my $mapidx = 0; $mapidx <= $#childnodes; $mapidx++) {
570 #my $map_right = $self->{args}->{mapping}->{$key};
571
572 # get property name
573 $self->{node}->{source}->{propcache}->{property} = $self->{meta}->{source}->{childnodes}->[$mapidx];
574 $self->{node}->{target}->{propcache}->{property} = $self->{meta}->{target}->{childnodes}->[$mapidx];
575 #print "map: $map_right", "\n";
576
577 # get property value
578 my $value;
579
580 # detect for callback - old style - (maybe the better???)
581 if (ref($self->{node}->{target}->{map}) eq 'CODE') {
582 #$value = &$map_right($objClone);
583 } else {
584 # plain (scalar?) value
585 #$value = $objClone->{$map_right};
586 $self->{node}->{source}->{propcache}->{value} = $self->{node}->{source}->{payload}->{$self->{node}->{source}->{propcache}->{property}};
587 }
588 #$self->{node}->{map}->{$key} = $value;
589
590 # encode values dependent on type of underlying storage here - expand cases...
591 my $storage_type = $self->{meta}->{target}->{storage}->{locator}->{type};
592 if ($storage_type eq 'DBI') {
593 # ...for sql
594 $self->{node}->{source}->{propcache}->{value} = quotesql($self->{node}->{source}->{propcache}->{value});
595 } elsif ($storage_type eq 'Tangram') {
596 } elsif ($storage_type eq 'LDAP') {
597 # TODO: encode utf8 here?
598 }
599
600 # store value to transfer map
601 $self->{node}->{map}->{$self->{node}->{target}->{propcache}->{property}} = $self->{node}->{source}->{propcache}->{value};
602
603 }
604 }
605
606 #print "self->{entry}: ", Dumper($self->{node}), "\n"; exit;
607
608 # for transferring deeply nested structures described by expressions
609 # this currently does not work!
610 # TODO: re-enable this!
611 if ($self->{args}->{mappingV2}) {
612
613 # apply mapping from $self->{args}->{mappingV2} to $self->{node}->{map}
614 foreach my $mapStep (@{$self->{args}->{mappingV2}}) {
615
616 # prepare left/right keys/values
617 my $left_key = $mapStep->{left};
618 my $left_val = _resolveMapStepExpr( $self->{node}->{source}->{payload}, $mapStep->{left} );
619 my $right_key = $mapStep->{right};
620 my $right_val = ( $mapStep->{right} );
621 #print "map: $map_right", "\n";
622
623 if ($mapStep->{method}) {
624 if ($mapStep->{method} eq 'v:1') {
625 $left_val = $left_key;
626 }
627 }
628
629 #$self->{node}->{map}->{$key} = $value;
630 #if ( grep(!/$right_key/, @{$self->{args}->{exclude}}) ) {
631 $self->{node}->{map}->{$right_key} = $self->{R}->quoteSql($left_val);
632 #}
633 }
634 }
635
636 # TODO: $logger->dump( ... );
637 #$logger->debug( "sqlmap:" . "\n" . Dumper($self->{node}->{map}) );
638 #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
639 #print "entrystatus: ", Dumper($self->{node}), "\n";
640
641 }
642
643 sub _resolveNodeIdent {
644 my $self = shift;
645 my $descent = shift;
646
647 #print Dumper($self->{node}->{$descent});
648
649 # get to the payload
650 #my $item = $specifier->{item};
651 my $payload = $self->{node}->{$descent}->{payload};
652
653 # resolve method to get to the id of the given item
654 # we use global metadata and the given descent for this task
655 #my $ident = $self->{$descent}->id($item);
656 #my $ident = $self->{meta}->{$descent}->{storage}->id($item);
657
658 my $ident;
659 my $provider_method = $self->{meta}->{$descent}->{IdentProvider}->{method};
660 my $provider_arg = $self->{meta}->{$descent}->{IdentProvider}->{arg};
661
662 # resolve to ident
663 if ($provider_method eq 'property') {
664 $ident = $payload->{$provider_arg};
665
666 } elsif ($provider_method eq 'storage_method') {
667 #$ident = $self->{meta}->{$descent}->{storage}->id($item);
668 $ident = $self->{meta}->{$descent}->{storage}->$provider_arg($payload);
669 }
670
671 $self->{node}->{$descent}->{ident} = $ident;
672
673 return 1 if $ident;
674
675 }
676
677
678 sub _modifyNode {
679 my $self = shift;
680 my $descent = shift;
681 my $action = shift;
682 my $map = shift;
683 my $crit = shift;
684
685 # map for new style callbacks
686 my $map_callbacks = {};
687
688 # checks go first!
689
690 # TODO: this should be reviewed first - before extending ;-)
691 # TODO: this should be extended:
692 # count this cases inside the caller to this sub and provide a better overall message
693 # if this counts still zero in the end:
694 # "No nodes have been touched for modify: Do you have column-headers in your csv file?"
695 if (not defined $self->{node}) {
696 #$logger->critical( __PACKAGE__ . "->_modifyNode failed: \"$descent\" node is empty." );
697 #return;
698 }
699
700 # transfer callback nodes from value map to callback map - handle them afterwards! - (new style callbacks)
701 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
702 foreach my $callback (keys %{$callbacks->{write}}) {
703 $map_callbacks->{write}->{$callback} = $map->{$callback};
704 delete $map->{$callback};
705 }
706 }
707
708
709 # DBI speaks SQL
710 if ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'DBI') {
711
712 #print Dumper($self->{node});
713 my $sql_main;
714 # translate map to sql
715 #print $action, "\n"; exit;
716 #print $self->{meta}->{$descent}->{node}, "\n"; exit;
717 #print "action:";
718 #print $action, "\n";
719 #$action = "anc";
720 #print "yai", "\n";
721 if (lc($action) eq 'insert') {
722 $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_INSERT');
723 } elsif (lc $action eq 'update') {
724 $crit ||= "$self->{meta}->{$descent}->{IdentProvider}->{arg}='$self->{node}->{$descent}->{ident}'";
725 $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_UPDATE', $crit);
726 }
727
728 #print "sql: ", $sql_main, "\n";
729 #exit;
730
731 # transfer data
732 my $sqlHandle = $self->{meta}->{$descent}->{storage}->sendCommand($sql_main);
733
734 # handle errors
735 if ($sqlHandle->err) {
736 #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
737 $self->{node}->{status}->{error} = {
738 statement => $sql_main,
739 state => $sqlHandle->state,
740 err => $sqlHandle->err,
741 errstr => $sqlHandle->errstr,
742 };
743 } else {
744 $self->{node}->{status}->{ok} = 1;
745 }
746
747 # Tangram does it the oo-way (naturally)
748 } elsif ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'Tangram') {
749 my $sql_main;
750 my $object;
751
752 # determine classname
753 my $classname = $self->{meta}->{$descent}->{node};
754
755 # properties to exclude
756 my @exclude = @{$self->{meta}->{$descent}->{subnodes_exclude}};
757
758
759 if (my $identProvider = $self->{meta}->{$descent}->{IdentProvider}) {
760 push @exclude, $identProvider->{arg};
761 }
762
763 # new feature:
764 # - check TypeProvider metadata property from other side
765 # - use argument (arg) inside as a classname for object creation on this side
766 #my $otherSide = $self->_otherSide($descent);
767 if (my $typeProvider = $self->{meta}->{$descent}->{TypeProvider}) {
768 #print Dumper($map);
769 $classname = $map->{$typeProvider->{arg}};
770 # remove nodes from map also (push nodes to "subnodes_exclude" list)
771 push @exclude, $typeProvider->{arg};
772 }
773
774 # exclude banned properties (remove from map)
775 #map { delete $self->{node}->{map}->{$_} } @{$self->{args}->{exclude}};
776 map { delete $map->{$_} } @exclude;
777
778 # list of properties
779 my @props = keys %{$map};
780
781 # transfer data
782 if (lc $action eq 'insert') {
783
784 # build array to initialize object
785 #my @initarray = ();
786 #map { push @initarray, $_, undef; } @props;
787
788 # make the object persistent in four steps:
789 # - raw create (perl / class tangram scope)
790 # - engine insert (tangram scope) ... this establishes inheritance - don't try to fill in inherited properties before!
791 # - raw fill-in from hash (perl scope)
792 # - engine update (tangram scope) ... this updates all properties just filled in
793
794 # create new object ...
795 #my $object = $classname->new( @initarray );
796 $object = $classname->new();
797
798 # ... pass to orm ...
799 $self->{meta}->{$descent}->{storage}->insert($object);
800
801 # ... and initialize with empty (undef'd) properties.
802 #print Dumper(@props);
803 map { $object->{$_} = undef; } @props;
804
805 # mix in values ...
806 hash2object($object, $map);
807
808 # ... and re-update@orm.
809 $self->{meta}->{$descent}->{storage}->update($object);
810
811 # asymmetry: get ident after insert
812 # TODO:
813 # - just do this if it is an IdentAuthority
814 # - use IdentProvider metadata here
815 $self->{node}->{$descent}->{ident} = $self->{meta}->{$descent}->{storage}->id($object);
816
817
818 } elsif (lc $action eq 'update') {
819
820 # get fresh object from orm first
821 $object = $self->{meta}->{$descent}->{storage}->load($self->{node}->{$descent}->{ident});
822
823 #print Dumper($self->{node});
824
825 # mix in values
826 #print Dumper($object);
827 hash2object($object, $map);
828 #print Dumper($object);
829 #exit;
830 $self->{meta}->{$descent}->{storage}->update($object);
831 }
832
833 my $error = 0;
834
835 # handle new style callbacks - this is a HACK - do this without an eval!
836 #print Dumper($map);
837 #print "cb: ", Dumper($self->{meta}->{$descent}->{Callback});
838 #print Dumper($map_callbacks);
839 foreach my $node (keys %{$map_callbacks->{write}}) {
840 #print Dumper($node);
841 my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_write';
842 my $evalstring = $perl_callback . '( { object => $object, value => $map_callbacks->{write}->{$node}, storage => $self->{meta}->{$descent}->{storage} } );';
843 #print $evalstring, "\n"; exit;
844 eval($evalstring);
845 if ($@) {
846 $error = 1;
847 print $@, "\n";
848 }
849
850 #print "after eval", "\n";
851
852 if (!$error) {
853 # re-update@orm
854 $self->{meta}->{$descent}->{storage}->update($object);
855 }
856 }
857
858 # handle errors
859 if ($error) {
860 #print "error", "\n";
861 =pod
862 my $sqlHandle;
863 #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
864 $self->{node}->{status}->{error} = {
865 statement => $sql_main,
866 state => $sqlHandle->state,
867 err => $sqlHandle->err,
868 errstr => $sqlHandle->errstr,
869 };
870 =cut
871 # rollback....
872 #print "rollback", "\n";
873 $self->{meta}->{$descent}->{storage}->erase($object);
874 #print "after rollback", "\n";
875 } else {
876 $self->{node}->{status}->{ok} = 1;
877 }
878
879
880 }
881
882 }
883
884 # TODO:
885 # this should be split up into...
886 # - a "_statNode" (should just touch the node to check for existance)
887 # - a "_loadNode" (should load node completely)
888 # - maybe additionally a "loadNodeProperty" (may specify properties to load)
889 # - introduce $self->{nodecache} for this purpose
890 # TODO:
891 # should we:
892 # - not pass ident in here but resolve it via "$descent"?
893 # - refactor this and stuff it with additional debug/error message
894 # - this = the way the implicit load mechanism works
895 sub _statloadNode {
896
897 my $self = shift;
898 my $descent = shift;
899 my $ident = shift;
900 my $force = shift;
901
902 # fetch entry to retrieve checksum from
903 # was:
904 if (!$self->{node}->{$descent} || $force) {
905 # is:
906 #if (!$self->{node}->{$descent}->{item} || $force) {
907
908 if (!$ident) {
909 #print "\n", "Attempt to fetch entry implicitely by ident failed: no ident given! This may result in an insert if no write-protection is in the way.", "\n";
910 return;
911 }
912
913 my $result = $self->{meta}->{$descent}->{storage}->sendQuery({
914 node => $self->{meta}->{$descent}->{node},
915 subnodes => [qw( cs )],
916 criterias => [
917 { key => $self->{meta}->{$descent}->{IdentProvider}->{arg},
918 op => 'eq',
919 val => $ident },
920 ]
921 });
922
923 my $entry = $result->getNextEntry();
924 my $status = $result->getStatus();
925
926 # TODO: enhance error handling (store inside tc)
927 #if (!$row) {
928 # print "\n", "row error", "\n";
929 # next;
930 #}
931 if (($status && $status->{err}) || !$entry) {
932 #$logger->critical( __PACKAGE__ . "->_loadNode (ident=\"$ident\") failed" );
933 return;
934 }
935 # was:
936 # $self->{node}->{$descent}->{ident} = $ident;
937 # is:
938 # TODO: re-resolve ident from entry via metadata "IdentProvider"
939 $self->{node}->{$descent}->{ident} = $ident;
940 $self->{node}->{$descent}->{payload} = $entry;
941 }
942
943 return 1;
944
945 }
946
947 sub _doTransferToTarget {
948 my $self = shift;
949 my $action = shift;
950 $self->_modifyNode('target', $action, $self->{node}->{map});
951 }
952
953 sub _doModifySource_IdentChecksum {
954 my $self = shift;
955 my $ident_new = shift;
956 # this changes an old node to a new one including ident and checksum
957 # TODO:
958 # - eventually introduce an external resource to store this data to
959 # - we won't have to "re"-modify the source node here
960 my $map = {
961 $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
962 cs => $self->{node}->{target}->{checksum},
963 };
964 #print Dumper($map);
965 #print Dumper($self->{node});
966 #exit;
967 $self->_modifyNode('source', 'update', $map);
968 }
969
970
971 # this is a shortcut method
972 # ... let's try to avoid _any_ redundant code in here (ok... - at the cost of method lookups...)
973 sub _getNodeList {
974 my $self = shift;
975 my $descent = shift;
976 my $filter = shift;
977 return $self->{meta}->{$descent}->{storage}->getListFiltered($self->{meta}->{$descent}->{node}, $filter);
978 }
979
980
981 sub _prepareNode_MetaProperties {
982 my $self = shift;
983 my $descent = shift;
984
985 $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
986
987 # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
988 my $list = $self->_getNodeList($descent);
989
990 # get first node
991 my $firstnode = $list->[0];
992
993 # check if node contains meta properties/nodes
994 # TODO: "cs" is hardcoded here!
995 my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
996 my @found = keys %$firstnode;
997 #my @diff = getDifference(\@found, \@required);
998 my $diff = getDifference(\@required, \@found);
999 #print Dumper(@found);
1000 #print Dumper(@required);
1001 #print Dumper(@diff);
1002 #if (!$#diff || $#diff == -1) {
1003 if (isEmpty($diff)) {
1004 $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
1005 foreach (@required) {
1006 my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
1007 #print "sql: $sql", "\n";
1008 my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
1009 #print Dumper($res->getStatus());
1010 }
1011 }
1012
1013 }
1014
1015 sub _prepareNode_DummyIdent {
1016 my $self = shift;
1017 my $descent = shift;
1018
1019 $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
1020
1021 my $list = $self->_getNodeList($descent);
1022 #print Dumper($list);
1023 my $i = 0;
1024 my $ident_base = 5678983;
1025 my $ident_appendix = '0001';
1026 foreach my $node (@$list) {
1027 my $ident_dummy = $i + $ident_base;
1028 $ident_dummy .= $ident_appendix;
1029 my $map = {
1030 $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
1031 cs => undef,
1032 };
1033
1034 # diff lists and ...
1035 my $diff = getDifference([keys %$node], [keys %$map]);
1036 next if $#{$diff} == -1;
1037
1038 # ... build criteria including all columns
1039 my @crits;
1040 foreach my $property (@$diff) {
1041 next if !$property;
1042 my $value = $node->{$property};
1043 next if !$value;
1044 push @crits, "$property='" . quotesql($value) . "'";
1045 }
1046 my $crit = join ' AND ', @crits;
1047 print "p" if $self->{verbose};
1048 $self->_modifyNode($descent, 'update', $map, $crit);
1049 $i++;
1050 }
1051
1052 print "\n" if $self->{verbose};
1053
1054 if (!$i) {
1055 $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
1056 }
1057
1058 }
1059
1060 # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
1061 sub _otherSide {
1062 my $self = shift;
1063 my $side = shift;
1064 return 'source' if $side eq 'target';
1065 return 'target' if $side eq 'source';
1066 return '';
1067 }
1068
1069
1070 =pod
1071
1072
1073 =head1 DESCRIPTION
1074
1075 Data::Transfer::Sync is a module providing a generic synchronization process
1076 across arbitrary/multiple storages based on a ident/checksum mechanism.
1077 It sits on top of Data::Storage.
1078
1079
1080 =head1 REQUIREMENTS
1081
1082 For full functionality:
1083 Data::Storage
1084 Data::Transform
1085 Data::Compare
1086 ... and all their dependencies
1087
1088
1089 =head1 AUTHORS / COPYRIGHT
1090
1091 The Data::Storage module is Copyright (c) 2002 Andreas Motl.
1092 All rights reserved.
1093
1094 You may distribute it under the terms of either the GNU General Public
1095 License or the Artistic License, as specified in the Perl README file.
1096
1097
1098 =head1 SUPPORT / WARRANTY
1099
1100 Data::Storage is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1101
1102
1103
1104 =head1 BUGS
1105
1106 When in "import" mode for windows file - DBD::AutoCSV may hang.
1107 Hint: Maybe the source node contains an ident-, but no checksum-column?
1108
1109
1110 =head1 USER LEVEL ERRORS
1111
1112 =head4 Mapping
1113
1114 - - - - - - - - - - - - - - - - - - - - - - - - - -
1115 info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )critical: BizWorks::Process::Setup->startSync: Can't access mapping for node "Currency" - please check BizWorks::ResourceMapping.
1116 - - - - - - - - - - - - - - - - - - - - - - - - - -
1117 You have to create a sub for each node used in synchronization inside named Perl module. The name of this sub _must_ match
1118 the name of the node you want to sync. This sub holds mapping metadata to give the engine hints about how
1119 to access the otherwise generic nodes.
1120 - - - - - - - - - - - - - - - - - - - - - - - - - -
1121
1122
1123 =head4 DBD::AutoCSV's rulebase
1124
1125 - - - - - - - - - - - - - - - - - - - - - - - - - -
1126 info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )
1127 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1128
1129 Execution ERROR: Error while scanning: Missing first row or scanrule not applied. at C:/home/amo/develop/netfrag.org/nfo/perl/libs/DBD/CSV.p
1130 m line 165, <GEN9> line 1.
1131 called from C:/home/amo/develop/netfrag.org/nfo/perl/libs/Data/Storage/Handler/DBI.pm at 123.
1132
1133 DBI-Error: DBD::AutoCSV::st fetchrow_hashref failed: Attempt to fetch row from a Non-SELECT statement
1134 notice: Data::Transfer::Sync->syncNodes: No nodes to synchronize.
1135 - - - - - - - - - - - - - - - - - - - - - - - - - -
1136 DBD::AutoCSV contains a rulebase which is spooled down while attempting to guess the style of the csv file regarding
1137 parameters like newline (eol), column-seperation-character (sep_char), quoting character (quote_char).
1138 If this spool runs out of entries and no style could be resolved, DBD::CSV dies causing this "Execution ERROR" which
1139 results in a "DBI-Error" afterwards.
1140 - - - - - - - - - - - - - - - - - - - - - - - - - -
1141
1142
1143 =head4 Check structure of source node
1144
1145 - - - - - - - - - - - - - - - - - - - - - - - - - -
1146 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1147 critical: Data::Transfer::Sync->syncNodes: Can not synchronize: No ident found in source node, maybe try to "import" this node first.
1148 - - - - - - - - - - - - - - - - - - - - - - - - - -
1149 If lowlevel detection succeeds, but no other required informations are found, this message is issued.
1150 "Other informations" might be:
1151 - column-header-row completely missing
1152 - ident column is empty
1153 - - - - - - - - - - - - - - - - - - - - - - - - - -
1154
1155
1156 =head4 Modify structure of source node
1157
1158 - - - - - - - - - - - - - - - - - - - - - - - - - -
1159 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1160 info: Data::Transfer::Sync->_prepareNode_MetaProperties( descent source )
1161 warning: Data::Transfer::Sync->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter...
1162 SQL ERROR: Command 'ALTER' not recognized or not supported!
1163
1164 SQL ERROR: Command 'ALTER' not recognized or not supported!
1165 - - - - - - - - - - - - - - - - - - - - - - - - - -
1166 The Engine found a node which structure does not match the required. It tries to alter this automatically - only when doing "import" -
1167 but the DBD driver (in this case DBD::CSV) gets in the way croaking not to be able to do this.
1168 This could also appear if your database connection has insufficient rights to modify the database structure.
1169 DBD::CSV croaks because it doesn't implement the ALTER command, so please edit your columns manually.
1170 Hint: Add columns with the names of your "ident" and "checksum" property specifications.
1171 - - - - - - - - - - - - - - - - - - - - - - - - - -
1172
1173
1174 =head4 Load source node by ident
1175
1176 - - - - - - - - - - - - - - - - - - - - - - - - - -
1177 info: Data::Transfer::Sync->_prepareNode_DummyIdent( descent source )
1178 pcritical: Data::Transfer::Sync->_modifyNode failed: "source" node is empty.
1179 - - - - - - - - - - - - - - - - - - - - - - - - - -
1180 The source node could not be loaded. Maybe the ident is missing. Please check manually.
1181 Hint: Like above, the ident and/or checksum columns may be missing....
1182 - - - - - - - - - - - - - - - - - - - - - - - - - -
1183
1184
1185 =head1 TODO
1186
1187 - sub _resolveIdentProvider
1188 - wrap _doModifySource and _doTransferTarget around a core function which can change virtually any type of node
1189 - split this module up into Sync.pm, Sync/Core.pm, Sync/Compare.pm and Sync/Compare/Checksum.pm
1190 - introduce _compareNodes as a core method and wrap it around methods in Sync/Compare/Checksum.pm
1191 - introduce Sync/Compare/MyComparisonImplementation.pm
1192 - some generic deferring method - e.g. "$self->defer(action)" - to be able to accumulate a bunch of actions for later processing
1193 - this implies everything done is _really_ split up into generic actions - how else would we defer them???
1194 - example uses:
1195 - fetch whole checksum list from node
1196 - remember source ident retransmits
1197 - remember: this is convenient - and maybe / of course faster - but we'll loose "per-node-atomic" operations
1198 - feature: mechanism to implicit inject checksum property to nodes (alter table / modify schema)
1199 - expand statistics / keep track of:
1200 - touched/untouched nodes
1201 - full sync
1202 - just do a push and a pull for now but use stats for touched nodes in between to speed up things
1203 - introduce some new metadata flags for a synchronization partner which is (e.g.) of "source" or "target":
1204 - isNewNodePropagator
1205 - isWriteProtected
1206
1207
1208 =cut
1209
1210 1;

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed