/[cvs]/nfo/perl/libs/Data/Transfer/Sync.pm
ViewVC logotype

Contents of /nfo/perl/libs/Data/Transfer/Sync.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.3 - (show annotations)
Sun Dec 1 22:26:59 2002 UTC (21 years, 7 months ago) by joko
Branch: MAIN
Changes since 1.2: +10 -2 lines
+ minor cosmetics for logging

1 ## $Id: Sync.pm,v 1.2 2002/12/01 04:43:25 joko Exp $
2 ##
3 ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
4 ##
5 ## See COPYRIGHT section in pod text below for usage and distribution rights.
6 ##
7 ## ----------------------------------------------------------------------------------------
8 ## $Log: Sync.pm,v $
9 ## Revision 1.2 2002/12/01 04:43:25 joko
10 ## + mapping deatil entries may now be either an ARRAY or a HASH
11 ## + erase flag is used now (for export-operations)
12 ## + expressions to refer to values inside deep nested structures
13 ## - removed old mappingV2-code
14 ## + cosmetics
15 ## + sub _erase_all
16 ##
17 ## Revision 1.1 2002/11/29 04:45:50 joko
18 ## + initial check in
19 ##
20 ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
21 ## + new
22 ## ----------------------------------------------------------------------------------------
23
24
25 package Data::Transfer::Sync;
26
27 use strict;
28 use warnings;
29
30 use Data::Dumper;
31 use misc::HashExt;
32 use libp qw( md5_base64 );
33 use libdb qw( quotesql hash2Sql );
34 use Data::Transform::Deep qw( hash2object refexpr2perlref );
35 use Data::Compare::Struct qw( getDifference isEmpty );
36
37 # get logger instance
38 my $logger = Log::Dispatch::Config->instance;
39
40
41 sub new {
42 my $invocant = shift;
43 my $class = ref($invocant) || $invocant;
44 my $self = { @_ };
45 $logger->debug( __PACKAGE__ . "->new(@_)" );
46 bless $self, $class;
47 $self->_init();
48 return $self;
49 }
50
51
52 sub _init {
53 my $self = shift;
54
55 # build new container if necessary
56 $self->{container} = Data::Storage::Container->new() if !$self->{container};
57
58 # add storages to container (optional)
59 foreach (keys %{$self->{storages}}) {
60 $self->{container}->addStorage($_, $self->{storages}->{$_});
61 }
62
63 # tag storages with id-authority and checksum-provider information
64 # TODO: better store tag inside metadata to hold bits together!
65 map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
66 map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
67 map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
68
69 }
70
71
72 # TODO: some feature to show off the progress of synchronization (cur/max * 100)
73 sub syncNodes {
74
75 my $self = shift;
76 my $args = shift;
77
78 # remember arguments through the whole processing
79 $self->{args} = $args;
80
81 $logger->debug( __PACKAGE__ . "->syncNodes: starting" );
82
83 # hash to hold and/or fill in metadata required for the processing
84 $self->{meta} = {};
85
86 # hash to sum up results
87 my $direction_arrow = '';
88
89 # detect synchronization method to determine which optical symbol (directed arrow) to use
90 if (lc $self->{args}->{direction} eq 'push') {
91 $direction_arrow = '->';
92 } elsif (lc $self->{args}->{direction} eq 'pull') {
93 $direction_arrow = '<-';
94 } elsif (lc $self->{args}->{direction} eq 'full') {
95 $direction_arrow = '<->';
96 } else {
97 }
98
99 # decompose identifiers for each partner
100 # TODO: take this list from already established/given metadata
101 foreach ('source', 'target') {
102
103 # get/set metadata for further processing
104
105 # Partner and Node (e.g.: "L:Country" or "R:countries.csv")
106 if (my $item = $self->{args}->{$_}) {
107 my @item = split(':', $item);
108 $self->{meta}->{$_}->{dbkey} = $item[0];
109 $self->{meta}->{$_}->{node} = $item[1];
110 }
111
112 # Filter
113 if (my $item_filter = $self->{args}->{$_ . '_filter'}) {
114 $self->{meta}->{$_}->{filter} = $item_filter;
115 }
116
117 # IdentProvider
118 if (my $item_ident = $self->{args}->{$_ . '_ident'}) {
119 my @item_ident = split(':', $item_ident);
120 $self->{meta}->{$_}->{IdentProvider} = { method => $item_ident[0], arg => $item_ident[1] };
121 }
122
123 # TODO: ChecksumProvider
124
125 # exclude properties/subnodes
126 if (my $item_exclude = $self->{args}->{$_ . '_exclude'}) {
127 $self->{meta}->{$_}->{subnodes_exclude} = $item_exclude;
128 }
129
130 # TypeProvider
131 if (my $item_type = $self->{args}->{$_ . '_type'}) {
132 my @item_type = split(':', $item_type);
133 $self->{meta}->{$_}->{TypeProvider} = { method => $item_type[0], arg => $item_type[1] };
134 }
135
136 # Callbacks - writers (will be triggered _before_ writing to target)
137 if (my $item_writers = $self->{args}->{$_ . '_callbacks_write'}) {
138 my $descent = $_; # this is important since the following code inside the map wants to use its own context variables
139 map { $self->{meta}->{$descent}->{Callback}->{write}->{$_}++; } @$item_writers;
140 }
141
142 # Callbacks - readers (will be triggered _after_ reading from source)
143 if (my $item_readers = $self->{args}->{$_ . '_callbacks_read'}) {
144 my $descent = $_;
145 map { $self->{meta}->{$descent}->{Callback}->{read}->{$_}++; } @$item_readers;
146 }
147
148 # resolve storage objects
149 #$self->{$_} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
150 # relink references to metainfo
151 $self->{meta}->{$_}->{storage} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
152 #print "iiiiisprov: ", Dumper($self->{meta}->{$_}->{storage}), "\n";
153 }
154
155 $logger->info( __PACKAGE__ . "->syncNodes: source=$self->{meta}->{source}->{dbkey}/$self->{meta}->{source}->{node} $direction_arrow target=$self->{meta}->{target}->{dbkey}/$self->{meta}->{target}->{node}" );
156
157 # build mapping
158 # incoming: and Array of node map entries (Array or Hash) - e.g.
159 # [ 'source:item_name' => 'target:class_val' ]
160 # { source => 'event->startDateTime', target => 'begindate' }
161 foreach (@{$self->{args}->{mapping}}) {
162 if (ref $_ eq 'ARRAY') {
163 my @entry1 = split(':', $_->[0]);
164 my @entry2 = split(':', $_->[1]);
165 my $descent = [];
166 my $node = [];
167 $descent->[0] = $entry1[0];
168 $descent->[1] = $entry2[0];
169 $node->[0] = $entry1[1];
170 $node->[1] = $entry2[1];
171 push @{$self->{meta}->{$descent->[0]}->{childnodes}}, $node->[0];
172 push @{$self->{meta}->{$descent->[1]}->{childnodes}}, $node->[1];
173 } elsif (ref $_ eq 'HASH') {
174 foreach my $entry_key (keys %$_) {
175 my $entry_val = $_->{$entry_key};
176 push @{$self->{meta}->{$entry_key}->{childnodes}}, $entry_val;
177 }
178 }
179
180 }
181
182 # check partners/nodes: does partner exist / is node available?
183 foreach my $partner (keys %{$self->{meta}}) {
184 next if $self->{meta}->{$partner}->{storage}->{locator}->{type} eq 'DBI'; # for DBD::CSV - re-enable for others
185 my $node = $self->{meta}->{$partner}->{node};
186 if (!$self->{meta}->{$partner}->{storage}->existsChildNode($node)) {
187 $logger->critical( __PACKAGE__ . "->syncNodes: Could not reach \"$node\" at \"$partner\"." );
188 return;
189 }
190 }
191
192 # TODO:
193 # + if action == PUSH: start processing
194 # -+ if action == PULL: swap metadata and start processing
195 # - if action == FULL: start processing, then swap metadata and (re-)start processing
196
197 #print Dumper($self->{args});
198
199 # manipulate metainfo according to direction of synchronization
200 if (lc $self->{args}->{direction} eq 'push') {
201 # just do it ...
202 } elsif (lc $self->{args}->{direction} eq 'pull') {
203 #print "=======SWAP", "\n";
204 # swap
205 ($self->{meta}->{source}, $self->{meta}->{target}) =
206 ($self->{meta}->{target}, $self->{meta}->{source});
207 } elsif (lc $self->{args}->{direction} eq 'full') {
208 } else {
209 }
210
211 # import flag means: prepare the source node to be syncable
212 # this is useful if there are e.g. no "ident" or "checksum" columns yet inside a DBI like (row-based) storage
213 if ($self->{args}->{import}) {
214 $self->_prepareNode_MetaProperties('source');
215 $self->_prepareNode_DummyIdent('source');
216 #return;
217 #$self->_erase_all($opts->{source_node});
218 }
219
220 # erase flag means: erase the target
221 #if ($opts->{erase}) {
222 if ($self->{args}->{erase}) {
223 # TODO: move this method to the scope of the synchronization core and wrap it around different handlers
224 #print "ERASE", "\n";
225 $self->_erase_all('target');
226 }
227
228 $self->_syncNodes();
229
230 }
231
232
233 # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="§/%???
234 sub _syncNodes {
235
236 my $self = shift;
237
238 my $tc = OneLineDumpHash->new( {} );
239 my $results;
240
241 # set of objects is already in $self->{args}
242 # TODO: make independent of the terminology "object..."
243 $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
244
245 # apply filter
246 if (my $filter = $self->{meta}->{source}->{filter}) {
247 #print Dumper($filter);
248 #exit;
249 $results ||= $self->_getNodeList('source', $filter);
250 }
251
252 # get reference to node list from convenient method provided by corehandle
253 #$results ||= $self->{source}->getListUnfiltered($self->{meta}->{source}->{node});
254 #$results ||= $self->{meta}->{source}->{storage}->getListUnfiltered($self->{meta}->{source}->{node});
255 $results ||= $self->_getNodeList('source');
256
257 # checkpoint: do we actually have a list to iterate through?
258 if (!$results || !@{$results}) {
259 $logger->notice( __PACKAGE__ . "->syncNodes: No nodes to synchronize." );
260 return;
261 }
262
263 # dereference
264 my @results = @{$results};
265
266 # iterate through set
267 foreach my $source_node_real (@results) {
268
269 $tc->{total}++;
270
271 #print "======================== iter", "\n";
272
273 # clone object (in case we have to modify it here)
274 # TODO:
275 # - is a "deep_copy" needed here if occouring modifications take place?
276 # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
277 # - after all, just take care for now that this object doesn't get updated!
278 my $source_node = $source_node_real;
279
280 # modify entry - handle new style callbacks (the readers)
281 #print Dumper($source_node);
282 #exit;
283
284 my $descent = 'source';
285
286 # handle callbacks right now while scanning them (asymmetric to the writers)
287 my $map_callbacks = {};
288 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
289
290 my $error = 0;
291
292 foreach my $node (keys %{$callbacks->{read}}) {
293
294 my $object = $source_node;
295 my $value; # = $source_node->{$node};
296
297 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
298 my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
299 my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
300 #print $evalstring, "\n"; exit;
301 my $cb_result = eval($evalstring);
302 if ($@) {
303 die $@;
304 $error = 1;
305 print $@, "\n";
306 }
307 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
308
309 $source_node->{$node} = $cb_result;
310
311 }
312
313 }
314
315 #print Dumper($source_node);
316
317 # exclude defined fields (simply delete from object)
318 map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
319
320 # here we accumulate information about the status of the current node (payload/row/object/item/entry)
321 $self->{node} = {};
322 $self->{node}->{source}->{payload} = $source_node;
323
324 #print "res - ident", "\n";
325
326 # determine ident of entry
327 my $identOK = $self->_resolveNodeIdent('source');
328 #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
329 if (!$identOK) {
330 $logger->critical( __PACKAGE__ . "->syncNodes: Can not synchronize: No ident found in source node, maybe try to \"import\" this node first." );
331 return;
332 }
333
334 #print "statload", "\n";
335 #print "ident: ", $self->{node}->{source}->{ident}, "\n";
336
337 my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
338
339 # mark node as new either if there's no ident or if stat/load failed
340 if (!$statOK) {
341 $self->{node}->{status}->{new} = 1;
342 print "n" if $self->{verbose};
343 }
344
345 #print "checksum", "\n";
346
347 # determine status of entry by synchronization method
348 if ( (lc $self->{args}->{method} eq 'checksum') ) {
349 #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
350 #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
351
352 # TODO:
353 # is this really worth a "critical"???
354 # no - it should just be a debug appendix i believe
355
356 #print "readcs", "\n";
357
358 # calculate checksum of source node
359 #$self->_calcChecksum('source');
360 if (!$self->_readChecksum('source')) {
361 $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
362 $tc->{skip}++;
363 print "s" if $self->{verbose};
364 next;
365 }
366
367 # get checksum from synchronization target
368 $self->_readChecksum('target');
369 #if (!$self->_readChecksum('target')) {
370 # $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
371 # next;
372 #}
373
374 # pre flight check: do we actually have a checksum provided?
375 #if (!$self->{node}->{source}->{checksum}) {
376 # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
377 # return;
378 #}
379
380 # determine if entry is "new" or "dirty"
381 # after all, this seems to be the point where the hammer falls.....
382 print "c" if $self->{verbose};
383 $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
384 if (!$self->{node}->{status}->{new}) {
385 $self->{node}->{status}->{dirty} =
386 $self->{node}->{status}->{new} ||
387 (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
388 ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
389 $self->{args}->{force};
390 }
391
392 }
393
394 # first reaction on entry-status: continue with next entry if the current is already "in sync"
395 if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
396 $tc->{in_sync}++;
397 next;
398 }
399
400 # build map to actually transfer the data from source to target
401 $self->_buildMap();
402
403
404 #print Dumper($self->{node}); exit;
405
406 #print "attempt", "\n";
407
408 # additional (new) checks for feature "write-protection"
409 if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
410 $tc->{attempt_transfer}++;
411 print "\n" if $self->{verbose};
412 $logger->notice( __PACKAGE__ . "->syncNodes: Target is write-protected. Will not insert or modify node. " .
413 "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
414 print "\n" if $self->{verbose};
415 $tc->{skip}++;
416 next;
417 }
418
419 # transfer contents of map to target
420 if ($self->{node}->{status}->{new}) {
421 $tc->{attempt_new}++;
422 $self->_doTransferToTarget('insert');
423 # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
424 $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
425 $self->_readChecksum('target');
426
427 } elsif ($self->{node}->{status}->{dirty}) {
428 $tc->{attempt_modify}++;
429 # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
430 $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
431 $self->_doTransferToTarget('update');
432 $self->_readChecksum('target');
433 }
434
435 if ($self->{node}->{status}->{ok}) {
436 $tc->{ok}++;
437 print "t" if $self->{verbose};
438 }
439
440 if ($self->{node}->{status}->{error}) {
441 $tc->{error}++;
442 push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
443 print "e" if $self->{verbose};
444 }
445
446 # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
447 # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
448 if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{storage}->{isIdentAuthority}) {
449 #print Dumper($self->{meta});
450 #print Dumper($self->{node});
451 #exit;
452 $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
453 print "r" if $self->{verbose};
454 }
455
456 print ":" if $self->{verbose};
457
458 }
459
460 print "\n" if $self->{verbose};
461
462 # build user-message from some stats
463 my $msg = "statistics: $tc";
464
465 if ($tc->{error_per_row}) {
466 $msg .= "\n";
467 $msg .= "errors from \"error_per_row\":" . "\n";
468 $msg .= Dumper($tc->{error_per_row});
469 }
470
471 # todo!!!
472 #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
473 $logger->info( __PACKAGE__ . "->syncNodes: $msg" );
474
475 return $tc;
476
477 }
478
479
480 sub _dumpCompact {
481 my $self = shift;
482
483 #my $vars = \@_;
484 my @data = ();
485
486 my $count = 0;
487 foreach (@_) {
488 my $item = {};
489 foreach my $key (keys %$_) {
490 my $val = $_->{$key};
491 if (ref $val eq 'Set::Object') {
492 #print "========================= SET", "\n";
493 #print Dumper($val);
494 #print Dumper($val->members());
495 #$val = $val->members();
496 #$vars->[$count]->{$key} = $val->members() if $val->can("members");
497 #$item->{$key} = $val->members() if $val->can("members");
498 $item->{$key} = $val->members();
499 #print Dumper($vars->[$count]->{$key});
500 } else {
501 $item->{$key} = $val;
502 }
503 }
504 push @data, $item;
505 $count++;
506 }
507
508 #print "Dump:", "\n";
509 #print Dumper(@data);
510
511 $Data::Dumper::Indent = 0;
512 my $result = Dumper(@data);
513 $Data::Dumper::Indent = 2;
514 return $result;
515 }
516
517
518 sub _calcChecksum {
519
520 my $self = shift;
521 my $descent = shift;
522 my $specifier = shift;
523
524 # calculate checksum for current object
525 my $ident = $self->{node}->{$descent}->{ident};
526
527 # build dump of this node
528 my $payload = $self->{node}->{$descent}->{payload};
529 #my $dump = $ident . "\n" . $item->quickdump();
530 #my $dump = $ident . "\n" . Dumper($item);
531 my $dump = $ident . "\n" . $self->_dumpCompact($payload);
532
533 # TODO: $logger->dump( ... );
534 #$logger->debug( __PACKAGE__ . ": " . $dump );
535 #$logger->dump( __PACKAGE__ . ": " . $dump );
536
537 # calculate checksum from dump
538 # md5-based fingerprint, base64 encoded (from Digest::MD5)
539 #my $checksum_cur = md5_base64($objdump) . '==';
540 # 32-bit integer "hash" value (maybe faster?) (from DBI)
541 $self->{node}->{$descent}->{checksum} = DBI::hash($dump, 1);
542
543 # signal good
544 return 1;
545
546 }
547
548
549 sub _readChecksum {
550 my $self = shift;
551
552 my $descent = shift;
553
554 #print "getcheck:", "\n"; print Dumper($self->{node}->{$descent});
555
556 if (!$self->{node}->{$descent}) {
557 # signal checksum bad
558 return;
559 }
560
561 # get checksum for current entry
562 # TODO: don't have the checksum column/property hardcoded as "cs" here, make this configurable somehow
563
564 if ($self->{meta}->{$descent}->{storage}->{isChecksumAuthority}) {
565 #$self->{node}->{$descent}->{checksum} = $entry->{cs};
566 #$self->{node}->{$descent}->{checksum} = $self->_calcChecksum($descent); # $entry->{cs};
567 #print "descent: $descent", "\n";
568 $self->_calcChecksum($descent);
569 #print "checksum: ", $self->{node}->{$descent}->{checksum}, "\n";
570 } else {
571
572 #$self->{node}->{$descent}->{checksum} = $entry->{cs};
573 $self->{node}->{$descent}->{checksum} = $self->{node}->{$descent}->{payload}->{cs};
574 }
575
576 # signal checksum good
577 return 1;
578
579 }
580
581
582 sub _buildMap {
583
584 my $self = shift;
585
586 # field-structure for building sql
587 # mapping of sql-fieldnames to object-attributes
588 $self->{node}->{map} = {};
589
590 # manually set ...
591 # ... object-id
592 $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}} = $self->{node}->{source}->{ident};
593 # ... checksum
594 $self->{node}->{map}->{cs} = $self->{node}->{source}->{checksum};
595
596 #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
597
598 # for transferring flat structures via simple (1:1) mapping
599 # TODO: diff per property / property value
600
601 if ($self->{args}->{mapping}) {
602 # apply mapping from $self->{args}->{mapping} to $self->{node}->{map}
603 #foreach my $key (@{$self->{meta}->{source}->{childnodes}}) {
604 my @childnodes = @{$self->{meta}->{source}->{childnodes}};
605 for (my $mapidx = 0; $mapidx <= $#childnodes; $mapidx++) {
606 #my $map_right = $self->{args}->{mapping}->{$key};
607
608 $self->{node}->{source}->{propcache} = {};
609 $self->{node}->{target}->{propcache} = {};
610
611 # get property name
612 $self->{node}->{source}->{propcache}->{property} = $self->{meta}->{source}->{childnodes}->[$mapidx];
613 $self->{node}->{target}->{propcache}->{property} = $self->{meta}->{target}->{childnodes}->[$mapidx];
614 #print "map: $map_right", "\n";
615
616 # get property value
617 my $value;
618
619 # detect for callback - old style - (maybe the better???)
620 if (ref($self->{node}->{target}->{map}) eq 'CODE') {
621 #$value = &$map_right($objClone);
622 } else {
623 # plain (scalar?) value
624 #$value = $objClone->{$map_right};
625 $self->{node}->{source}->{propcache}->{value} = $self->{node}->{source}->{payload}->{$self->{node}->{source}->{propcache}->{property}};
626 }
627 #$self->{node}->{map}->{$key} = $value;
628
629 # detect expression
630 # for transferring deeply nested structures described by expressions
631 #print "val: $self->{node}->{source}->{propcache}->{value}", "\n";
632 if ($self->{node}->{source}->{propcache}->{property} =~ s/^expr://) {
633
634 # create an anonymous sub to act as callback target dispatcher
635 my $cb_dispatcher = sub {
636 #print "=============== CALLBACK DISPATCHER", "\n";
637 #print "ident: ", $self->{node}->{source}->{ident}, "\n";
638 #return $self->{node}->{source}->{ident};
639
640 };
641
642
643 #print Dumper($self->{node});
644
645 # build callback map for helper function
646 #my $cbmap = { $self->{meta}->{source}->{IdentProvider}->{arg} => $cb_dispatcher };
647 my $cbmap = {};
648 my $value = refexpr2perlref($self->{node}->{source}->{payload}, $self->{node}->{source}->{propcache}->{property}, $cbmap);
649 $self->{node}->{source}->{propcache}->{value} = $value;
650 }
651
652 # encode values dependent on type of underlying storage here - expand cases...
653 my $storage_type = $self->{meta}->{target}->{storage}->{locator}->{type};
654 if ($storage_type eq 'DBI') {
655 # ...for sql
656 $self->{node}->{source}->{propcache}->{value} = quotesql($self->{node}->{source}->{propcache}->{value});
657 }
658 elsif ($storage_type eq 'Tangram') {
659 # iso? utf8 already possible?
660
661 } elsif ($storage_type eq 'LDAP') {
662 # TODO: encode utf8 here?
663 }
664
665 # store value to transfer map
666 $self->{node}->{map}->{$self->{node}->{target}->{propcache}->{property}} = $self->{node}->{source}->{propcache}->{value};
667
668 }
669 }
670
671
672 # TODO: $logger->dump( ... );
673 #$logger->debug( "sqlmap:" . "\n" . Dumper($self->{node}->{map}) );
674 #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
675 #print "entrystatus: ", Dumper($self->{node}), "\n";
676
677 }
678
679 sub _resolveNodeIdent {
680 my $self = shift;
681 my $descent = shift;
682
683 #print Dumper($self->{node}->{$descent});
684
685 # get to the payload
686 #my $item = $specifier->{item};
687 my $payload = $self->{node}->{$descent}->{payload};
688
689 # resolve method to get to the id of the given item
690 # we use global metadata and the given descent for this task
691 #my $ident = $self->{$descent}->id($item);
692 #my $ident = $self->{meta}->{$descent}->{storage}->id($item);
693
694 my $ident;
695 my $provider_method = $self->{meta}->{$descent}->{IdentProvider}->{method};
696 my $provider_arg = $self->{meta}->{$descent}->{IdentProvider}->{arg};
697
698 # resolve to ident
699 if ($provider_method eq 'property') {
700 $ident = $payload->{$provider_arg};
701
702 } elsif ($provider_method eq 'storage_method') {
703 #$ident = $self->{meta}->{$descent}->{storage}->id($item);
704 $ident = $self->{meta}->{$descent}->{storage}->$provider_arg($payload);
705 }
706
707 $self->{node}->{$descent}->{ident} = $ident;
708
709 return 1 if $ident;
710
711 }
712
713
714 sub _modifyNode {
715 my $self = shift;
716 my $descent = shift;
717 my $action = shift;
718 my $map = shift;
719 my $crit = shift;
720
721 # map for new style callbacks
722 my $map_callbacks = {};
723
724 # checks go first!
725
726 # TODO: this should be reviewed first - before extending ;-)
727 # TODO: this should be extended:
728 # count this cases inside the caller to this sub and provide a better overall message
729 # if this counts still zero in the end:
730 # "No nodes have been touched for modify: Do you have column-headers in your csv file?"
731 if (not defined $self->{node}) {
732 #$logger->critical( __PACKAGE__ . "->_modifyNode failed: \"$descent\" node is empty." );
733 #return;
734 }
735
736 # transfer callback nodes from value map to callback map - handle them afterwards! - (new style callbacks)
737 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
738 foreach my $callback (keys %{$callbacks->{write}}) {
739 $map_callbacks->{write}->{$callback} = $map->{$callback};
740 delete $map->{$callback};
741 }
742 }
743
744
745 # DBI speaks SQL
746 if ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'DBI') {
747
748 #print Dumper($self->{node});
749 my $sql_main;
750 # translate map to sql
751 #print $action, "\n"; exit;
752 #print $self->{meta}->{$descent}->{node}, "\n"; exit;
753 #print "action:";
754 #print $action, "\n";
755 #$action = "anc";
756 #print "yai", "\n";
757 if (lc($action) eq 'insert') {
758 $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_INSERT');
759 } elsif (lc $action eq 'update') {
760 $crit ||= "$self->{meta}->{$descent}->{IdentProvider}->{arg}='$self->{node}->{$descent}->{ident}'";
761 $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_UPDATE', $crit);
762 }
763
764 #print "sql: ", $sql_main, "\n";
765 #exit;
766
767 # transfer data
768 my $sqlHandle = $self->{meta}->{$descent}->{storage}->sendCommand($sql_main);
769
770 # handle errors
771 if ($sqlHandle->err) {
772 #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
773 $self->{node}->{status}->{error} = {
774 statement => $sql_main,
775 state => $sqlHandle->state,
776 err => $sqlHandle->err,
777 errstr => $sqlHandle->errstr,
778 };
779 } else {
780 $self->{node}->{status}->{ok} = 1;
781 }
782
783 # Tangram does it the oo-way (naturally)
784 } elsif ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'Tangram') {
785 my $sql_main;
786 my $object;
787
788 # determine classname
789 my $classname = $self->{meta}->{$descent}->{node};
790
791 # properties to exclude
792 my @exclude = @{$self->{meta}->{$descent}->{subnodes_exclude}};
793
794
795 if (my $identProvider = $self->{meta}->{$descent}->{IdentProvider}) {
796 push @exclude, $identProvider->{arg};
797 }
798
799 # new feature:
800 # - check TypeProvider metadata property from other side
801 # - use argument (arg) inside as a classname for object creation on this side
802 #my $otherSide = $self->_otherSide($descent);
803 if (my $typeProvider = $self->{meta}->{$descent}->{TypeProvider}) {
804 #print Dumper($map);
805 $classname = $map->{$typeProvider->{arg}};
806 # remove nodes from map also (push nodes to "subnodes_exclude" list)
807 push @exclude, $typeProvider->{arg};
808 }
809
810 # exclude banned properties (remove from map)
811 #map { delete $self->{node}->{map}->{$_} } @{$self->{args}->{exclude}};
812 map { delete $map->{$_} } @exclude;
813
814 # list of properties
815 my @props = keys %{$map};
816
817 # transfer data
818 if (lc $action eq 'insert') {
819
820 # build array to initialize object
821 #my @initarray = ();
822 #map { push @initarray, $_, undef; } @props;
823
824 # make the object persistent in four steps:
825 # - raw create (perl / class tangram scope)
826 # - engine insert (tangram scope) ... this establishes inheritance - don't try to fill in inherited properties before!
827 # - raw fill-in from hash (perl scope)
828 # - engine update (tangram scope) ... this updates all properties just filled in
829
830 # create new object ...
831 #my $object = $classname->new( @initarray );
832 $object = $classname->new();
833
834 # ... pass to orm ...
835 $self->{meta}->{$descent}->{storage}->insert($object);
836
837 # ... and initialize with empty (undef'd) properties.
838 #print Dumper(@props);
839 map { $object->{$_} = undef; } @props;
840
841 # mix in values ...
842 hash2object($object, $map);
843
844 # ... and re-update@orm.
845 $self->{meta}->{$descent}->{storage}->update($object);
846
847 # asymmetry: get ident after insert
848 # TODO:
849 # - just do this if it is an IdentAuthority
850 # - use IdentProvider metadata here
851 $self->{node}->{$descent}->{ident} = $self->{meta}->{$descent}->{storage}->id($object);
852
853
854 } elsif (lc $action eq 'update') {
855
856 # get fresh object from orm first
857 $object = $self->{meta}->{$descent}->{storage}->load($self->{node}->{$descent}->{ident});
858
859 #print Dumper($self->{node});
860
861 # mix in values
862 #print Dumper($object);
863 hash2object($object, $map);
864 #print Dumper($object);
865 #exit;
866 $self->{meta}->{$descent}->{storage}->update($object);
867 }
868
869 my $error = 0;
870
871 # handle new style callbacks - this is a HACK - do this without an eval!
872 #print Dumper($map);
873 #print "cb: ", Dumper($self->{meta}->{$descent}->{Callback});
874 #print Dumper($map_callbacks);
875 foreach my $node (keys %{$map_callbacks->{write}}) {
876 #print Dumper($node);
877 my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_write';
878 my $evalstring = $perl_callback . '( { object => $object, value => $map_callbacks->{write}->{$node}, storage => $self->{meta}->{$descent}->{storage} } );';
879 #print $evalstring, "\n"; exit;
880 eval($evalstring);
881 if ($@) {
882 $error = 1;
883 print $@, "\n";
884 }
885
886 #print "after eval", "\n";
887
888 if (!$error) {
889 # re-update@orm
890 $self->{meta}->{$descent}->{storage}->update($object);
891 }
892 }
893
894 # handle errors
895 if ($error) {
896 #print "error", "\n";
897 =pod
898 my $sqlHandle;
899 #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
900 $self->{node}->{status}->{error} = {
901 statement => $sql_main,
902 state => $sqlHandle->state,
903 err => $sqlHandle->err,
904 errstr => $sqlHandle->errstr,
905 };
906 =cut
907 # rollback....
908 #print "rollback", "\n";
909 $self->{meta}->{$descent}->{storage}->erase($object);
910 #print "after rollback", "\n";
911 } else {
912 $self->{node}->{status}->{ok} = 1;
913 }
914
915
916 }
917
918 }
919
920 # TODO:
921 # this should be split up into...
922 # - a "_statNode" (should just touch the node to check for existance)
923 # - a "_loadNode" (should load node completely)
924 # - maybe additionally a "loadNodeProperty" (may specify properties to load)
925 # - introduce $self->{nodecache} for this purpose
926 # TODO:
927 # should we:
928 # - not pass ident in here but resolve it via "$descent"?
929 # - refactor this and stuff it with additional debug/error message
930 # - this = the way the implicit load mechanism works
931 sub _statloadNode {
932
933 my $self = shift;
934 my $descent = shift;
935 my $ident = shift;
936 my $force = shift;
937
938 # fetch entry to retrieve checksum from
939 # was:
940 if (!$self->{node}->{$descent} || $force) {
941 # is:
942 #if (!$self->{node}->{$descent}->{item} || $force) {
943
944 if (!$ident) {
945 #print "\n", "Attempt to fetch entry implicitely by ident failed: no ident given! This may result in an insert if no write-protection is in the way.", "\n";
946 return;
947 }
948
949 my $result = $self->{meta}->{$descent}->{storage}->sendQuery({
950 node => $self->{meta}->{$descent}->{node},
951 subnodes => [qw( cs )],
952 criterias => [
953 { key => $self->{meta}->{$descent}->{IdentProvider}->{arg},
954 op => 'eq',
955 val => $ident },
956 ]
957 });
958
959 my $entry = $result->getNextEntry();
960 my $status = $result->getStatus();
961
962 # TODO: enhance error handling (store inside tc)
963 #if (!$row) {
964 # print "\n", "row error", "\n";
965 # next;
966 #}
967 if (($status && $status->{err}) || !$entry) {
968 #$logger->critical( __PACKAGE__ . "->_loadNode (ident=\"$ident\") failed" );
969 return;
970 }
971 # was:
972 # $self->{node}->{$descent}->{ident} = $ident;
973 # is:
974 # TODO: re-resolve ident from entry via metadata "IdentProvider"
975 $self->{node}->{$descent}->{ident} = $ident;
976 $self->{node}->{$descent}->{payload} = $entry;
977 }
978
979 return 1;
980
981 }
982
983 sub _doTransferToTarget {
984 my $self = shift;
985 my $action = shift;
986 $self->_modifyNode('target', $action, $self->{node}->{map});
987 }
988
989 sub _doModifySource_IdentChecksum {
990 my $self = shift;
991 my $ident_new = shift;
992 # this changes an old node to a new one including ident and checksum
993 # TODO:
994 # - eventually introduce an external resource to store this data to
995 # - we won't have to "re"-modify the source node here
996 my $map = {
997 $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
998 cs => $self->{node}->{target}->{checksum},
999 };
1000 #print Dumper($map);
1001 #print Dumper($self->{node});
1002 #exit;
1003 $self->_modifyNode('source', 'update', $map);
1004 }
1005
1006
1007 # this is a shortcut method
1008 # ... let's try to avoid _any_ redundant code in here (ok... - at the cost of method lookups...)
1009 sub _getNodeList {
1010 my $self = shift;
1011 my $descent = shift;
1012 my $filter = shift;
1013 return $self->{meta}->{$descent}->{storage}->getListFiltered($self->{meta}->{$descent}->{node}, $filter);
1014 }
1015
1016
1017 sub _prepareNode_MetaProperties {
1018 my $self = shift;
1019 my $descent = shift;
1020
1021 $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
1022
1023 # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
1024 my $list = $self->_getNodeList($descent);
1025
1026 # get first node
1027 my $firstnode = $list->[0];
1028
1029 # check if node contains meta properties/nodes
1030 # TODO: "cs" is hardcoded here!
1031 my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
1032 my @found = keys %$firstnode;
1033 #my @diff = getDifference(\@found, \@required);
1034 my $diff = getDifference(\@required, \@found);
1035 #print Dumper(@found);
1036 #print Dumper(@required);
1037 #print Dumper(@diff);
1038 #if (!$#diff || $#diff == -1) {
1039 if (isEmpty($diff)) {
1040 $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
1041 foreach (@required) {
1042 my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
1043 #print "sql: $sql", "\n";
1044 my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
1045 #print Dumper($res->getStatus());
1046 }
1047 }
1048
1049 }
1050
1051 sub _prepareNode_DummyIdent {
1052 my $self = shift;
1053 my $descent = shift;
1054
1055 $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
1056
1057 my $list = $self->_getNodeList($descent);
1058 #print Dumper($list);
1059 my $i = 0;
1060 my $ident_base = 5678983;
1061 my $ident_appendix = '0001';
1062 foreach my $node (@$list) {
1063 my $ident_dummy = $i + $ident_base;
1064 $ident_dummy .= $ident_appendix;
1065 my $map = {
1066 $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
1067 cs => undef,
1068 };
1069
1070 # diff lists and ...
1071 my $diff = getDifference([keys %$node], [keys %$map]);
1072 next if $#{$diff} == -1;
1073
1074 # ... build criteria including all columns
1075 my @crits;
1076 foreach my $property (@$diff) {
1077 next if !$property;
1078 my $value = $node->{$property};
1079 next if !$value;
1080 push @crits, "$property='" . quotesql($value) . "'";
1081 }
1082 my $crit = join ' AND ', @crits;
1083 print "p" if $self->{verbose};
1084 $self->_modifyNode($descent, 'update', $map, $crit);
1085 $i++;
1086 }
1087
1088 print "\n" if $self->{verbose};
1089
1090 if (!$i) {
1091 $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
1092 }
1093
1094 }
1095
1096 # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
1097 sub _otherSide {
1098 my $self = shift;
1099 my $descent = shift;
1100 return 'source' if $descent eq 'target';
1101 return 'target' if $descent eq 'source';
1102 return '';
1103 }
1104
1105 sub _erase_all {
1106 my $self = shift;
1107 my $descent = shift;
1108 #my $node = shift;
1109 my $node = $self->{meta}->{$descent}->{node};
1110 $self->{meta}->{$descent}->{storage}->eraseAll($node);
1111 }
1112
1113
1114 =pod
1115
1116
1117 =head1 DESCRIPTION
1118
1119 Data::Transfer::Sync is a module providing a generic synchronization process
1120 across arbitrary/multiple storages based on a ident/checksum mechanism.
1121 It sits on top of Data::Storage.
1122
1123
1124 =head1 REQUIREMENTS
1125
1126 For full functionality:
1127 Data::Storage
1128 Data::Transform
1129 Data::Compare
1130 ... and all their dependencies
1131
1132
1133 =head1 AUTHORS / COPYRIGHT
1134
1135 The Data::Storage module is Copyright (c) 2002 Andreas Motl.
1136 All rights reserved.
1137
1138 You may distribute it under the terms of either the GNU General Public
1139 License or the Artistic License, as specified in the Perl README file.
1140
1141
1142 =head1 SUPPORT / WARRANTY
1143
1144 Data::Storage is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1145
1146
1147
1148 =head1 BUGS
1149
1150 When in "import" mode for windows file - DBD::AutoCSV may hang.
1151 Hint: Maybe the source node contains an ident-, but no checksum-column?
1152
1153
1154 =head1 USER LEVEL ERRORS
1155
1156 =head4 Mapping
1157
1158 - - - - - - - - - - - - - - - - - - - - - - - - - -
1159 info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )critical: BizWorks::Process::Setup->startSync: Can't access mapping for node "Currency" - please check BizWorks::ResourceMapping.
1160 - - - - - - - - - - - - - - - - - - - - - - - - - -
1161 You have to create a sub for each node used in synchronization inside named Perl module. The name of this sub _must_ match
1162 the name of the node you want to sync. This sub holds mapping metadata to give the engine hints about how
1163 to access the otherwise generic nodes.
1164 - - - - - - - - - - - - - - - - - - - - - - - - - -
1165
1166
1167 =head4 DBD::AutoCSV's rulebase
1168
1169 - - - - - - - - - - - - - - - - - - - - - - - - - -
1170 info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )
1171 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1172
1173 Execution ERROR: Error while scanning: Missing first row or scanrule not applied. at C:/home/amo/develop/netfrag.org/nfo/perl/libs/DBD/CSV.p
1174 m line 165, <GEN9> line 1.
1175 called from C:/home/amo/develop/netfrag.org/nfo/perl/libs/Data/Storage/Handler/DBI.pm at 123.
1176
1177 DBI-Error: DBD::AutoCSV::st fetchrow_hashref failed: Attempt to fetch row from a Non-SELECT statement
1178 notice: Data::Transfer::Sync->syncNodes: No nodes to synchronize.
1179 - - - - - - - - - - - - - - - - - - - - - - - - - -
1180 DBD::AutoCSV contains a rulebase which is spooled down while attempting to guess the style of the csv file regarding
1181 parameters like newline (eol), column-seperation-character (sep_char), quoting character (quote_char).
1182 If this spool runs out of entries and no style could be resolved, DBD::CSV dies causing this "Execution ERROR" which
1183 results in a "DBI-Error" afterwards.
1184 - - - - - - - - - - - - - - - - - - - - - - - - - -
1185
1186
1187 =head4 Check structure of source node
1188
1189 - - - - - - - - - - - - - - - - - - - - - - - - - -
1190 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1191 critical: Data::Transfer::Sync->syncNodes: Can not synchronize: No ident found in source node, maybe try to "import" this node first.
1192 - - - - - - - - - - - - - - - - - - - - - - - - - -
1193 If lowlevel detection succeeds, but no other required informations are found, this message is issued.
1194 "Other informations" might be:
1195 - column-header-row completely missing
1196 - ident column is empty
1197 - - - - - - - - - - - - - - - - - - - - - - - - - -
1198
1199
1200 =head4 Modify structure of source node
1201
1202 - - - - - - - - - - - - - - - - - - - - - - - - - -
1203 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1204 info: Data::Transfer::Sync->_prepareNode_MetaProperties( descent source )
1205 warning: Data::Transfer::Sync->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter...
1206 SQL ERROR: Command 'ALTER' not recognized or not supported!
1207
1208 SQL ERROR: Command 'ALTER' not recognized or not supported!
1209 - - - - - - - - - - - - - - - - - - - - - - - - - -
1210 The Engine found a node which structure does not match the required. It tries to alter this automatically - only when doing "import" -
1211 but the DBD driver (in this case DBD::CSV) gets in the way croaking not to be able to do this.
1212 This could also appear if your database connection has insufficient rights to modify the database structure.
1213 DBD::CSV croaks because it doesn't implement the ALTER command, so please edit your columns manually.
1214 Hint: Add columns with the names of your "ident" and "checksum" property specifications.
1215 - - - - - - - - - - - - - - - - - - - - - - - - - -
1216
1217
1218 =head4 Load source node by ident
1219
1220 - - - - - - - - - - - - - - - - - - - - - - - - - -
1221 info: Data::Transfer::Sync->_prepareNode_DummyIdent( descent source )
1222 pcritical: Data::Transfer::Sync->_modifyNode failed: "source" node is empty.
1223 - - - - - - - - - - - - - - - - - - - - - - - - - -
1224 The source node could not be loaded. Maybe the ident is missing. Please check manually.
1225 Hint: Like above, the ident and/or checksum columns may be missing....
1226 - - - - - - - - - - - - - - - - - - - - - - - - - -
1227
1228
1229 =head1 TODO
1230
1231 - sub _resolveIdentProvider
1232 - wrap _doModifySource and _doTransferTarget around a core function which can change virtually any type of node
1233 - split this module up into Sync.pm, Sync/Core.pm, Sync/Compare.pm and Sync/Compare/Checksum.pm
1234 - introduce _compareNodes as a core method and wrap it around methods in Sync/Compare/Checksum.pm
1235 - introduce Sync/Compare/MyComparisonImplementation.pm
1236 - some generic deferring method - e.g. "$self->defer(action)" - to be able to accumulate a bunch of actions for later processing
1237 - this implies everything done is _really_ split up into generic actions - how else would we defer them???
1238 - example uses:
1239 - fetch whole checksum list from node
1240 - remember source ident retransmits
1241 - remember: this is convenient - and maybe / of course faster - but we'll loose "per-node-atomic" operations
1242 - feature: mechanism to implicit inject checksum property to nodes (alter table / modify schema)
1243 - expand statistics / keep track of:
1244 - touched/untouched nodes
1245 - full sync
1246 - just do a push and a pull for now but use stats for touched nodes in between to speed up things
1247 - introduce some new metadata flags for a synchronization partner which is (e.g.) of "source" or "target":
1248 - isNewNodePropagator
1249 - isWriteProtected
1250
1251
1252 =cut
1253
1254 1;

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed