/[cvs]/nfo/perl/libs/Data/Transfer/Sync.pm
ViewVC logotype

Contents of /nfo/perl/libs/Data/Transfer/Sync.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.4 - (show annotations)
Tue Dec 3 15:54:07 2002 UTC (21 years, 7 months ago) by joko
Branch: MAIN
Changes since 1.3: +5 -2 lines
+ {import}-flag is now {prepare}-flag

1 ## $Id: Sync.pm,v 1.3 2002/12/01 22:26:59 joko Exp $
2 ##
3 ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
4 ##
5 ## See COPYRIGHT section in pod text below for usage and distribution rights.
6 ##
7 ## ----------------------------------------------------------------------------------------
8 ## $Log: Sync.pm,v $
9 ## Revision 1.3 2002/12/01 22:26:59 joko
10 ## + minor cosmetics for logging
11 ##
12 ## Revision 1.2 2002/12/01 04:43:25 joko
13 ## + mapping deatil entries may now be either an ARRAY or a HASH
14 ## + erase flag is used now (for export-operations)
15 ## + expressions to refer to values inside deep nested structures
16 ## - removed old mappingV2-code
17 ## + cosmetics
18 ## + sub _erase_all
19 ##
20 ## Revision 1.1 2002/11/29 04:45:50 joko
21 ## + initial check in
22 ##
23 ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
24 ## + new
25 ## ----------------------------------------------------------------------------------------
26
27
28 package Data::Transfer::Sync;
29
30 use strict;
31 use warnings;
32
33 use Data::Dumper;
34 use misc::HashExt;
35 use libp qw( md5_base64 );
36 use libdb qw( quotesql hash2Sql );
37 use Data::Transform::Deep qw( hash2object refexpr2perlref );
38 use Data::Compare::Struct qw( getDifference isEmpty );
39
40 # get logger instance
41 my $logger = Log::Dispatch::Config->instance;
42
43
44 sub new {
45 my $invocant = shift;
46 my $class = ref($invocant) || $invocant;
47 my $self = { @_ };
48 $logger->debug( __PACKAGE__ . "->new(@_)" );
49 bless $self, $class;
50 $self->_init();
51 return $self;
52 }
53
54
55 sub _init {
56 my $self = shift;
57
58 # build new container if necessary
59 $self->{container} = Data::Storage::Container->new() if !$self->{container};
60
61 # add storages to container (optional)
62 foreach (keys %{$self->{storages}}) {
63 $self->{container}->addStorage($_, $self->{storages}->{$_});
64 }
65
66 # tag storages with id-authority and checksum-provider information
67 # TODO: better store tag inside metadata to hold bits together!
68 map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
69 map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
70 map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
71
72 }
73
74
75 # TODO: some feature to show off the progress of synchronization (cur/max * 100)
76 sub syncNodes {
77
78 my $self = shift;
79 my $args = shift;
80
81 # remember arguments through the whole processing
82 $self->{args} = $args;
83
84 $logger->debug( __PACKAGE__ . "->syncNodes: starting" );
85
86 # hash to hold and/or fill in metadata required for the processing
87 $self->{meta} = {};
88
89 # hash to sum up results
90 my $direction_arrow = '';
91
92 # detect synchronization method to determine which optical symbol (directed arrow) to use
93 if (lc $self->{args}->{direction} eq 'push') {
94 $direction_arrow = '->';
95 } elsif (lc $self->{args}->{direction} eq 'pull') {
96 $direction_arrow = '<-';
97 } elsif (lc $self->{args}->{direction} eq 'full') {
98 $direction_arrow = '<->';
99 } else {
100 }
101
102 # decompose identifiers for each partner
103 # TODO: take this list from already established/given metadata
104 foreach ('source', 'target') {
105
106 # get/set metadata for further processing
107
108 # Partner and Node (e.g.: "L:Country" or "R:countries.csv")
109 if (my $item = $self->{args}->{$_}) {
110 my @item = split(':', $item);
111 $self->{meta}->{$_}->{dbkey} = $item[0];
112 $self->{meta}->{$_}->{node} = $item[1];
113 }
114
115 # Filter
116 if (my $item_filter = $self->{args}->{$_ . '_filter'}) {
117 $self->{meta}->{$_}->{filter} = $item_filter;
118 }
119
120 # IdentProvider
121 if (my $item_ident = $self->{args}->{$_ . '_ident'}) {
122 my @item_ident = split(':', $item_ident);
123 $self->{meta}->{$_}->{IdentProvider} = { method => $item_ident[0], arg => $item_ident[1] };
124 }
125
126 # TODO: ChecksumProvider
127
128 # exclude properties/subnodes
129 if (my $item_exclude = $self->{args}->{$_ . '_exclude'}) {
130 $self->{meta}->{$_}->{subnodes_exclude} = $item_exclude;
131 }
132
133 # TypeProvider
134 if (my $item_type = $self->{args}->{$_ . '_type'}) {
135 my @item_type = split(':', $item_type);
136 $self->{meta}->{$_}->{TypeProvider} = { method => $item_type[0], arg => $item_type[1] };
137 }
138
139 # Callbacks - writers (will be triggered _before_ writing to target)
140 if (my $item_writers = $self->{args}->{$_ . '_callbacks_write'}) {
141 my $descent = $_; # this is important since the following code inside the map wants to use its own context variables
142 map { $self->{meta}->{$descent}->{Callback}->{write}->{$_}++; } @$item_writers;
143 }
144
145 # Callbacks - readers (will be triggered _after_ reading from source)
146 if (my $item_readers = $self->{args}->{$_ . '_callbacks_read'}) {
147 my $descent = $_;
148 map { $self->{meta}->{$descent}->{Callback}->{read}->{$_}++; } @$item_readers;
149 }
150
151 # resolve storage objects
152 #$self->{$_} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
153 # relink references to metainfo
154 $self->{meta}->{$_}->{storage} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
155 #print "iiiiisprov: ", Dumper($self->{meta}->{$_}->{storage}), "\n";
156 }
157
158 $logger->info( __PACKAGE__ . "->syncNodes: source=$self->{meta}->{source}->{dbkey}/$self->{meta}->{source}->{node} $direction_arrow target=$self->{meta}->{target}->{dbkey}/$self->{meta}->{target}->{node}" );
159
160 # build mapping
161 # incoming: and Array of node map entries (Array or Hash) - e.g.
162 # [ 'source:item_name' => 'target:class_val' ]
163 # { source => 'event->startDateTime', target => 'begindate' }
164 foreach (@{$self->{args}->{mapping}}) {
165 if (ref $_ eq 'ARRAY') {
166 my @entry1 = split(':', $_->[0]);
167 my @entry2 = split(':', $_->[1]);
168 my $descent = [];
169 my $node = [];
170 $descent->[0] = $entry1[0];
171 $descent->[1] = $entry2[0];
172 $node->[0] = $entry1[1];
173 $node->[1] = $entry2[1];
174 push @{$self->{meta}->{$descent->[0]}->{childnodes}}, $node->[0];
175 push @{$self->{meta}->{$descent->[1]}->{childnodes}}, $node->[1];
176 } elsif (ref $_ eq 'HASH') {
177 foreach my $entry_key (keys %$_) {
178 my $entry_val = $_->{$entry_key};
179 push @{$self->{meta}->{$entry_key}->{childnodes}}, $entry_val;
180 }
181 }
182
183 }
184
185 # check partners/nodes: does partner exist / is node available?
186 foreach my $partner (keys %{$self->{meta}}) {
187 next if $self->{meta}->{$partner}->{storage}->{locator}->{type} eq 'DBI'; # for DBD::CSV - re-enable for others
188 my $node = $self->{meta}->{$partner}->{node};
189 if (!$self->{meta}->{$partner}->{storage}->existsChildNode($node)) {
190 $logger->critical( __PACKAGE__ . "->syncNodes: Could not reach \"$node\" at \"$partner\"." );
191 return;
192 }
193 }
194
195 # TODO:
196 # + if action == PUSH: start processing
197 # -+ if action == PULL: swap metadata and start processing
198 # - if action == FULL: start processing, then swap metadata and (re-)start processing
199
200 #print Dumper($self->{args});
201
202 # manipulate metainfo according to direction of synchronization
203 if (lc $self->{args}->{direction} eq 'push') {
204 # just do it ...
205 } elsif (lc $self->{args}->{direction} eq 'pull') {
206 #print "=======SWAP", "\n";
207 # swap
208 ($self->{meta}->{source}, $self->{meta}->{target}) =
209 ($self->{meta}->{target}, $self->{meta}->{source});
210 } elsif (lc $self->{args}->{direction} eq 'full') {
211 } else {
212 }
213
214 # import flag means: prepare the source node to be syncable
215 # this is useful if there are e.g. no "ident" or "checksum" columns yet inside a DBI like (row-based) storage
216 if ($self->{args}->{prepare}) {
217 $self->_prepareNode_MetaProperties('source');
218 $self->_prepareNode_DummyIdent('source');
219 #return;
220 #$self->_erase_all($opts->{source_node});
221 }
222
223 # erase flag means: erase the target
224 #if ($opts->{erase}) {
225 if ($self->{args}->{erase}) {
226 # TODO: move this method to the scope of the synchronization core and wrap it around different handlers
227 #print "ERASE", "\n";
228 $self->_erase_all('target');
229 }
230
231 $self->_syncNodes();
232
233 }
234
235
236 # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="§/%???
237 sub _syncNodes {
238
239 my $self = shift;
240
241 my $tc = OneLineDumpHash->new( {} );
242 my $results;
243
244 # set of objects is already in $self->{args}
245 # TODO: make independent of the terminology "object..."
246 $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
247
248 # apply filter
249 if (my $filter = $self->{meta}->{source}->{filter}) {
250 #print Dumper($filter);
251 #exit;
252 $results ||= $self->_getNodeList('source', $filter);
253 }
254
255 # get reference to node list from convenient method provided by corehandle
256 #$results ||= $self->{source}->getListUnfiltered($self->{meta}->{source}->{node});
257 #$results ||= $self->{meta}->{source}->{storage}->getListUnfiltered($self->{meta}->{source}->{node});
258 $results ||= $self->_getNodeList('source');
259
260 # checkpoint: do we actually have a list to iterate through?
261 if (!$results || !@{$results}) {
262 $logger->notice( __PACKAGE__ . "->syncNodes: No nodes to synchronize." );
263 return;
264 }
265
266 # dereference
267 my @results = @{$results};
268
269 # iterate through set
270 foreach my $source_node_real (@results) {
271
272 $tc->{total}++;
273
274 #print "======================== iter", "\n";
275
276 # clone object (in case we have to modify it here)
277 # TODO:
278 # - is a "deep_copy" needed here if occouring modifications take place?
279 # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
280 # - after all, just take care for now that this object doesn't get updated!
281 my $source_node = $source_node_real;
282
283 # modify entry - handle new style callbacks (the readers)
284 #print Dumper($source_node);
285 #exit;
286
287 my $descent = 'source';
288
289 # handle callbacks right now while scanning them (asymmetric to the writers)
290 my $map_callbacks = {};
291 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
292
293 my $error = 0;
294
295 foreach my $node (keys %{$callbacks->{read}}) {
296
297 my $object = $source_node;
298 my $value; # = $source_node->{$node};
299
300 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
301 my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
302 my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
303 #print $evalstring, "\n"; exit;
304 my $cb_result = eval($evalstring);
305 if ($@) {
306 die $@;
307 $error = 1;
308 print $@, "\n";
309 }
310 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
311
312 $source_node->{$node} = $cb_result;
313
314 }
315
316 }
317
318 #print Dumper($source_node);
319
320 # exclude defined fields (simply delete from object)
321 map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
322
323 # here we accumulate information about the status of the current node (payload/row/object/item/entry)
324 $self->{node} = {};
325 $self->{node}->{source}->{payload} = $source_node;
326
327 #print "res - ident", "\n";
328
329 # determine ident of entry
330 my $identOK = $self->_resolveNodeIdent('source');
331 #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
332 if (!$identOK) {
333 $logger->critical( __PACKAGE__ . "->syncNodes: Can not synchronize: No ident found in source node, maybe try to \"import\" this node first." );
334 return;
335 }
336
337 #print "statload", "\n";
338 #print "ident: ", $self->{node}->{source}->{ident}, "\n";
339
340 my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
341
342 # mark node as new either if there's no ident or if stat/load failed
343 if (!$statOK) {
344 $self->{node}->{status}->{new} = 1;
345 print "n" if $self->{verbose};
346 }
347
348 #print "checksum", "\n";
349
350 # determine status of entry by synchronization method
351 if ( (lc $self->{args}->{method} eq 'checksum') ) {
352 #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
353 #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
354
355 # TODO:
356 # is this really worth a "critical"???
357 # no - it should just be a debug appendix i believe
358
359 #print "readcs", "\n";
360
361 # calculate checksum of source node
362 #$self->_calcChecksum('source');
363 if (!$self->_readChecksum('source')) {
364 $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
365 $tc->{skip}++;
366 print "s" if $self->{verbose};
367 next;
368 }
369
370 # get checksum from synchronization target
371 $self->_readChecksum('target');
372 #if (!$self->_readChecksum('target')) {
373 # $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
374 # next;
375 #}
376
377 # pre flight check: do we actually have a checksum provided?
378 #if (!$self->{node}->{source}->{checksum}) {
379 # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
380 # return;
381 #}
382
383 # determine if entry is "new" or "dirty"
384 # after all, this seems to be the point where the hammer falls.....
385 print "c" if $self->{verbose};
386 $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
387 if (!$self->{node}->{status}->{new}) {
388 $self->{node}->{status}->{dirty} =
389 $self->{node}->{status}->{new} ||
390 (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
391 ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
392 $self->{args}->{force};
393 }
394
395 }
396
397 # first reaction on entry-status: continue with next entry if the current is already "in sync"
398 if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
399 $tc->{in_sync}++;
400 next;
401 }
402
403 # build map to actually transfer the data from source to target
404 $self->_buildMap();
405
406
407 #print Dumper($self->{node}); exit;
408
409 #print "attempt", "\n";
410
411 # additional (new) checks for feature "write-protection"
412 if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
413 $tc->{attempt_transfer}++;
414 print "\n" if $self->{verbose};
415 $logger->notice( __PACKAGE__ . "->syncNodes: Target is write-protected. Will not insert or modify node. " .
416 "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
417 print "\n" if $self->{verbose};
418 $tc->{skip}++;
419 next;
420 }
421
422 # transfer contents of map to target
423 if ($self->{node}->{status}->{new}) {
424 $tc->{attempt_new}++;
425 $self->_doTransferToTarget('insert');
426 # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
427 $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
428 $self->_readChecksum('target');
429
430 } elsif ($self->{node}->{status}->{dirty}) {
431 $tc->{attempt_modify}++;
432 # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
433 $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
434 $self->_doTransferToTarget('update');
435 $self->_readChecksum('target');
436 }
437
438 if ($self->{node}->{status}->{ok}) {
439 $tc->{ok}++;
440 print "t" if $self->{verbose};
441 }
442
443 if ($self->{node}->{status}->{error}) {
444 $tc->{error}++;
445 push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
446 print "e" if $self->{verbose};
447 }
448
449 # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
450 # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
451 if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{storage}->{isIdentAuthority}) {
452 #print Dumper($self->{meta});
453 #print Dumper($self->{node});
454 #exit;
455 $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
456 print "r" if $self->{verbose};
457 }
458
459 print ":" if $self->{verbose};
460
461 }
462
463 print "\n" if $self->{verbose};
464
465 # build user-message from some stats
466 my $msg = "statistics: $tc";
467
468 if ($tc->{error_per_row}) {
469 $msg .= "\n";
470 $msg .= "errors from \"error_per_row\":" . "\n";
471 $msg .= Dumper($tc->{error_per_row});
472 }
473
474 # todo!!!
475 #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
476 $logger->info( __PACKAGE__ . "->syncNodes: $msg" );
477
478 return $tc;
479
480 }
481
482
483 sub _dumpCompact {
484 my $self = shift;
485
486 #my $vars = \@_;
487 my @data = ();
488
489 my $count = 0;
490 foreach (@_) {
491 my $item = {};
492 foreach my $key (keys %$_) {
493 my $val = $_->{$key};
494 if (ref $val eq 'Set::Object') {
495 #print "========================= SET", "\n";
496 #print Dumper($val);
497 #print Dumper($val->members());
498 #$val = $val->members();
499 #$vars->[$count]->{$key} = $val->members() if $val->can("members");
500 #$item->{$key} = $val->members() if $val->can("members");
501 $item->{$key} = $val->members();
502 #print Dumper($vars->[$count]->{$key});
503 } else {
504 $item->{$key} = $val;
505 }
506 }
507 push @data, $item;
508 $count++;
509 }
510
511 #print "Dump:", "\n";
512 #print Dumper(@data);
513
514 $Data::Dumper::Indent = 0;
515 my $result = Dumper(@data);
516 $Data::Dumper::Indent = 2;
517 return $result;
518 }
519
520
521 sub _calcChecksum {
522
523 my $self = shift;
524 my $descent = shift;
525 my $specifier = shift;
526
527 # calculate checksum for current object
528 my $ident = $self->{node}->{$descent}->{ident};
529
530 # build dump of this node
531 my $payload = $self->{node}->{$descent}->{payload};
532 #my $dump = $ident . "\n" . $item->quickdump();
533 #my $dump = $ident . "\n" . Dumper($item);
534 my $dump = $ident . "\n" . $self->_dumpCompact($payload);
535
536 # TODO: $logger->dump( ... );
537 #$logger->debug( __PACKAGE__ . ": " . $dump );
538 #$logger->dump( __PACKAGE__ . ": " . $dump );
539
540 # calculate checksum from dump
541 # md5-based fingerprint, base64 encoded (from Digest::MD5)
542 #my $checksum_cur = md5_base64($objdump) . '==';
543 # 32-bit integer "hash" value (maybe faster?) (from DBI)
544 $self->{node}->{$descent}->{checksum} = DBI::hash($dump, 1);
545
546 # signal good
547 return 1;
548
549 }
550
551
552 sub _readChecksum {
553 my $self = shift;
554
555 my $descent = shift;
556
557 #print "getcheck:", "\n"; print Dumper($self->{node}->{$descent});
558
559 if (!$self->{node}->{$descent}) {
560 # signal checksum bad
561 return;
562 }
563
564 # get checksum for current entry
565 # TODO: don't have the checksum column/property hardcoded as "cs" here, make this configurable somehow
566
567 if ($self->{meta}->{$descent}->{storage}->{isChecksumAuthority}) {
568 #$self->{node}->{$descent}->{checksum} = $entry->{cs};
569 #$self->{node}->{$descent}->{checksum} = $self->_calcChecksum($descent); # $entry->{cs};
570 #print "descent: $descent", "\n";
571 $self->_calcChecksum($descent);
572 #print "checksum: ", $self->{node}->{$descent}->{checksum}, "\n";
573 } else {
574
575 #$self->{node}->{$descent}->{checksum} = $entry->{cs};
576 $self->{node}->{$descent}->{checksum} = $self->{node}->{$descent}->{payload}->{cs};
577 }
578
579 # signal checksum good
580 return 1;
581
582 }
583
584
585 sub _buildMap {
586
587 my $self = shift;
588
589 # field-structure for building sql
590 # mapping of sql-fieldnames to object-attributes
591 $self->{node}->{map} = {};
592
593 # manually set ...
594 # ... object-id
595 $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}} = $self->{node}->{source}->{ident};
596 # ... checksum
597 $self->{node}->{map}->{cs} = $self->{node}->{source}->{checksum};
598
599 #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
600
601 # for transferring flat structures via simple (1:1) mapping
602 # TODO: diff per property / property value
603
604 if ($self->{args}->{mapping}) {
605 # apply mapping from $self->{args}->{mapping} to $self->{node}->{map}
606 #foreach my $key (@{$self->{meta}->{source}->{childnodes}}) {
607 my @childnodes = @{$self->{meta}->{source}->{childnodes}};
608 for (my $mapidx = 0; $mapidx <= $#childnodes; $mapidx++) {
609 #my $map_right = $self->{args}->{mapping}->{$key};
610
611 $self->{node}->{source}->{propcache} = {};
612 $self->{node}->{target}->{propcache} = {};
613
614 # get property name
615 $self->{node}->{source}->{propcache}->{property} = $self->{meta}->{source}->{childnodes}->[$mapidx];
616 $self->{node}->{target}->{propcache}->{property} = $self->{meta}->{target}->{childnodes}->[$mapidx];
617 #print "map: $map_right", "\n";
618
619 # get property value
620 my $value;
621
622 # detect for callback - old style - (maybe the better???)
623 if (ref($self->{node}->{target}->{map}) eq 'CODE') {
624 #$value = &$map_right($objClone);
625 } else {
626 # plain (scalar?) value
627 #$value = $objClone->{$map_right};
628 $self->{node}->{source}->{propcache}->{value} = $self->{node}->{source}->{payload}->{$self->{node}->{source}->{propcache}->{property}};
629 }
630 #$self->{node}->{map}->{$key} = $value;
631
632 # detect expression
633 # for transferring deeply nested structures described by expressions
634 #print "val: $self->{node}->{source}->{propcache}->{value}", "\n";
635 if ($self->{node}->{source}->{propcache}->{property} =~ s/^expr://) {
636
637 # create an anonymous sub to act as callback target dispatcher
638 my $cb_dispatcher = sub {
639 #print "=============== CALLBACK DISPATCHER", "\n";
640 #print "ident: ", $self->{node}->{source}->{ident}, "\n";
641 #return $self->{node}->{source}->{ident};
642
643 };
644
645
646 #print Dumper($self->{node});
647
648 # build callback map for helper function
649 #my $cbmap = { $self->{meta}->{source}->{IdentProvider}->{arg} => $cb_dispatcher };
650 my $cbmap = {};
651 my $value = refexpr2perlref($self->{node}->{source}->{payload}, $self->{node}->{source}->{propcache}->{property}, $cbmap);
652 $self->{node}->{source}->{propcache}->{value} = $value;
653 }
654
655 # encode values dependent on type of underlying storage here - expand cases...
656 my $storage_type = $self->{meta}->{target}->{storage}->{locator}->{type};
657 if ($storage_type eq 'DBI') {
658 # ...for sql
659 $self->{node}->{source}->{propcache}->{value} = quotesql($self->{node}->{source}->{propcache}->{value});
660 }
661 elsif ($storage_type eq 'Tangram') {
662 # iso? utf8 already possible?
663
664 } elsif ($storage_type eq 'LDAP') {
665 # TODO: encode utf8 here?
666 }
667
668 # store value to transfer map
669 $self->{node}->{map}->{$self->{node}->{target}->{propcache}->{property}} = $self->{node}->{source}->{propcache}->{value};
670
671 }
672 }
673
674
675 # TODO: $logger->dump( ... );
676 #$logger->debug( "sqlmap:" . "\n" . Dumper($self->{node}->{map}) );
677 #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
678 #print "entrystatus: ", Dumper($self->{node}), "\n";
679
680 }
681
682 sub _resolveNodeIdent {
683 my $self = shift;
684 my $descent = shift;
685
686 #print Dumper($self->{node}->{$descent});
687
688 # get to the payload
689 #my $item = $specifier->{item};
690 my $payload = $self->{node}->{$descent}->{payload};
691
692 # resolve method to get to the id of the given item
693 # we use global metadata and the given descent for this task
694 #my $ident = $self->{$descent}->id($item);
695 #my $ident = $self->{meta}->{$descent}->{storage}->id($item);
696
697 my $ident;
698 my $provider_method = $self->{meta}->{$descent}->{IdentProvider}->{method};
699 my $provider_arg = $self->{meta}->{$descent}->{IdentProvider}->{arg};
700
701 # resolve to ident
702 if ($provider_method eq 'property') {
703 $ident = $payload->{$provider_arg};
704
705 } elsif ($provider_method eq 'storage_method') {
706 #$ident = $self->{meta}->{$descent}->{storage}->id($item);
707 $ident = $self->{meta}->{$descent}->{storage}->$provider_arg($payload);
708 }
709
710 $self->{node}->{$descent}->{ident} = $ident;
711
712 return 1 if $ident;
713
714 }
715
716
717 sub _modifyNode {
718 my $self = shift;
719 my $descent = shift;
720 my $action = shift;
721 my $map = shift;
722 my $crit = shift;
723
724 # map for new style callbacks
725 my $map_callbacks = {};
726
727 # checks go first!
728
729 # TODO: this should be reviewed first - before extending ;-)
730 # TODO: this should be extended:
731 # count this cases inside the caller to this sub and provide a better overall message
732 # if this counts still zero in the end:
733 # "No nodes have been touched for modify: Do you have column-headers in your csv file?"
734 if (not defined $self->{node}) {
735 #$logger->critical( __PACKAGE__ . "->_modifyNode failed: \"$descent\" node is empty." );
736 #return;
737 }
738
739 # transfer callback nodes from value map to callback map - handle them afterwards! - (new style callbacks)
740 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
741 foreach my $callback (keys %{$callbacks->{write}}) {
742 $map_callbacks->{write}->{$callback} = $map->{$callback};
743 delete $map->{$callback};
744 }
745 }
746
747
748 # DBI speaks SQL
749 if ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'DBI') {
750
751 #print Dumper($self->{node});
752 my $sql_main;
753 # translate map to sql
754 #print $action, "\n"; exit;
755 #print $self->{meta}->{$descent}->{node}, "\n"; exit;
756 #print "action:";
757 #print $action, "\n";
758 #$action = "anc";
759 #print "yai", "\n";
760 if (lc($action) eq 'insert') {
761 $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_INSERT');
762 } elsif (lc $action eq 'update') {
763 $crit ||= "$self->{meta}->{$descent}->{IdentProvider}->{arg}='$self->{node}->{$descent}->{ident}'";
764 $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_UPDATE', $crit);
765 }
766
767 #print "sql: ", $sql_main, "\n";
768 #exit;
769
770 # transfer data
771 my $sqlHandle = $self->{meta}->{$descent}->{storage}->sendCommand($sql_main);
772
773 # handle errors
774 if ($sqlHandle->err) {
775 #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
776 $self->{node}->{status}->{error} = {
777 statement => $sql_main,
778 state => $sqlHandle->state,
779 err => $sqlHandle->err,
780 errstr => $sqlHandle->errstr,
781 };
782 } else {
783 $self->{node}->{status}->{ok} = 1;
784 }
785
786 # Tangram does it the oo-way (naturally)
787 } elsif ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'Tangram') {
788 my $sql_main;
789 my $object;
790
791 # determine classname
792 my $classname = $self->{meta}->{$descent}->{node};
793
794 # properties to exclude
795 my @exclude = @{$self->{meta}->{$descent}->{subnodes_exclude}};
796
797
798 if (my $identProvider = $self->{meta}->{$descent}->{IdentProvider}) {
799 push @exclude, $identProvider->{arg};
800 }
801
802 # new feature:
803 # - check TypeProvider metadata property from other side
804 # - use argument (arg) inside as a classname for object creation on this side
805 #my $otherSide = $self->_otherSide($descent);
806 if (my $typeProvider = $self->{meta}->{$descent}->{TypeProvider}) {
807 #print Dumper($map);
808 $classname = $map->{$typeProvider->{arg}};
809 # remove nodes from map also (push nodes to "subnodes_exclude" list)
810 push @exclude, $typeProvider->{arg};
811 }
812
813 # exclude banned properties (remove from map)
814 #map { delete $self->{node}->{map}->{$_} } @{$self->{args}->{exclude}};
815 map { delete $map->{$_} } @exclude;
816
817 # list of properties
818 my @props = keys %{$map};
819
820 # transfer data
821 if (lc $action eq 'insert') {
822
823 # build array to initialize object
824 #my @initarray = ();
825 #map { push @initarray, $_, undef; } @props;
826
827 # make the object persistent in four steps:
828 # - raw create (perl / class tangram scope)
829 # - engine insert (tangram scope) ... this establishes inheritance - don't try to fill in inherited properties before!
830 # - raw fill-in from hash (perl scope)
831 # - engine update (tangram scope) ... this updates all properties just filled in
832
833 # create new object ...
834 #my $object = $classname->new( @initarray );
835 $object = $classname->new();
836
837 # ... pass to orm ...
838 $self->{meta}->{$descent}->{storage}->insert($object);
839
840 # ... and initialize with empty (undef'd) properties.
841 #print Dumper(@props);
842 map { $object->{$_} = undef; } @props;
843
844 # mix in values ...
845 hash2object($object, $map);
846
847 # ... and re-update@orm.
848 $self->{meta}->{$descent}->{storage}->update($object);
849
850 # asymmetry: get ident after insert
851 # TODO:
852 # - just do this if it is an IdentAuthority
853 # - use IdentProvider metadata here
854 $self->{node}->{$descent}->{ident} = $self->{meta}->{$descent}->{storage}->id($object);
855
856
857 } elsif (lc $action eq 'update') {
858
859 # get fresh object from orm first
860 $object = $self->{meta}->{$descent}->{storage}->load($self->{node}->{$descent}->{ident});
861
862 #print Dumper($self->{node});
863
864 # mix in values
865 #print Dumper($object);
866 hash2object($object, $map);
867 #print Dumper($object);
868 #exit;
869 $self->{meta}->{$descent}->{storage}->update($object);
870 }
871
872 my $error = 0;
873
874 # handle new style callbacks - this is a HACK - do this without an eval!
875 #print Dumper($map);
876 #print "cb: ", Dumper($self->{meta}->{$descent}->{Callback});
877 #print Dumper($map_callbacks);
878 foreach my $node (keys %{$map_callbacks->{write}}) {
879 #print Dumper($node);
880 my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_write';
881 my $evalstring = $perl_callback . '( { object => $object, value => $map_callbacks->{write}->{$node}, storage => $self->{meta}->{$descent}->{storage} } );';
882 #print $evalstring, "\n"; exit;
883 eval($evalstring);
884 if ($@) {
885 $error = 1;
886 print $@, "\n";
887 }
888
889 #print "after eval", "\n";
890
891 if (!$error) {
892 # re-update@orm
893 $self->{meta}->{$descent}->{storage}->update($object);
894 }
895 }
896
897 # handle errors
898 if ($error) {
899 #print "error", "\n";
900 =pod
901 my $sqlHandle;
902 #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
903 $self->{node}->{status}->{error} = {
904 statement => $sql_main,
905 state => $sqlHandle->state,
906 err => $sqlHandle->err,
907 errstr => $sqlHandle->errstr,
908 };
909 =cut
910 # rollback....
911 #print "rollback", "\n";
912 $self->{meta}->{$descent}->{storage}->erase($object);
913 #print "after rollback", "\n";
914 } else {
915 $self->{node}->{status}->{ok} = 1;
916 }
917
918
919 }
920
921 }
922
923 # TODO:
924 # this should be split up into...
925 # - a "_statNode" (should just touch the node to check for existance)
926 # - a "_loadNode" (should load node completely)
927 # - maybe additionally a "loadNodeProperty" (may specify properties to load)
928 # - introduce $self->{nodecache} for this purpose
929 # TODO:
930 # should we:
931 # - not pass ident in here but resolve it via "$descent"?
932 # - refactor this and stuff it with additional debug/error message
933 # - this = the way the implicit load mechanism works
934 sub _statloadNode {
935
936 my $self = shift;
937 my $descent = shift;
938 my $ident = shift;
939 my $force = shift;
940
941 # fetch entry to retrieve checksum from
942 # was:
943 if (!$self->{node}->{$descent} || $force) {
944 # is:
945 #if (!$self->{node}->{$descent}->{item} || $force) {
946
947 if (!$ident) {
948 #print "\n", "Attempt to fetch entry implicitely by ident failed: no ident given! This may result in an insert if no write-protection is in the way.", "\n";
949 return;
950 }
951
952 my $result = $self->{meta}->{$descent}->{storage}->sendQuery({
953 node => $self->{meta}->{$descent}->{node},
954 subnodes => [qw( cs )],
955 criterias => [
956 { key => $self->{meta}->{$descent}->{IdentProvider}->{arg},
957 op => 'eq',
958 val => $ident },
959 ]
960 });
961
962 my $entry = $result->getNextEntry();
963 my $status = $result->getStatus();
964
965 # TODO: enhance error handling (store inside tc)
966 #if (!$row) {
967 # print "\n", "row error", "\n";
968 # next;
969 #}
970 if (($status && $status->{err}) || !$entry) {
971 #$logger->critical( __PACKAGE__ . "->_loadNode (ident=\"$ident\") failed" );
972 return;
973 }
974 # was:
975 # $self->{node}->{$descent}->{ident} = $ident;
976 # is:
977 # TODO: re-resolve ident from entry via metadata "IdentProvider"
978 $self->{node}->{$descent}->{ident} = $ident;
979 $self->{node}->{$descent}->{payload} = $entry;
980 }
981
982 return 1;
983
984 }
985
986 sub _doTransferToTarget {
987 my $self = shift;
988 my $action = shift;
989 $self->_modifyNode('target', $action, $self->{node}->{map});
990 }
991
992 sub _doModifySource_IdentChecksum {
993 my $self = shift;
994 my $ident_new = shift;
995 # this changes an old node to a new one including ident and checksum
996 # TODO:
997 # - eventually introduce an external resource to store this data to
998 # - we won't have to "re"-modify the source node here
999 my $map = {
1000 $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
1001 cs => $self->{node}->{target}->{checksum},
1002 };
1003 #print Dumper($map);
1004 #print Dumper($self->{node});
1005 #exit;
1006 $self->_modifyNode('source', 'update', $map);
1007 }
1008
1009
1010 # this is a shortcut method
1011 # ... let's try to avoid _any_ redundant code in here (ok... - at the cost of method lookups...)
1012 sub _getNodeList {
1013 my $self = shift;
1014 my $descent = shift;
1015 my $filter = shift;
1016 return $self->{meta}->{$descent}->{storage}->getListFiltered($self->{meta}->{$descent}->{node}, $filter);
1017 }
1018
1019
1020 sub _prepareNode_MetaProperties {
1021 my $self = shift;
1022 my $descent = shift;
1023
1024 $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
1025
1026 # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
1027 my $list = $self->_getNodeList($descent);
1028
1029 # get first node
1030 my $firstnode = $list->[0];
1031
1032 # check if node contains meta properties/nodes
1033 # TODO: "cs" is hardcoded here!
1034 my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
1035 my @found = keys %$firstnode;
1036 #my @diff = getDifference(\@found, \@required);
1037 my $diff = getDifference(\@required, \@found);
1038 #print Dumper(@found);
1039 #print Dumper(@required);
1040 #print Dumper(@diff);
1041 #if (!$#diff || $#diff == -1) {
1042 if (isEmpty($diff)) {
1043 $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
1044 foreach (@required) {
1045 my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
1046 #print "sql: $sql", "\n";
1047 my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
1048 #print Dumper($res->getStatus());
1049 }
1050 }
1051
1052 }
1053
1054 sub _prepareNode_DummyIdent {
1055 my $self = shift;
1056 my $descent = shift;
1057
1058 $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
1059
1060 my $list = $self->_getNodeList($descent);
1061 #print Dumper($list);
1062 my $i = 0;
1063 my $ident_base = 5678983;
1064 my $ident_appendix = '0001';
1065 foreach my $node (@$list) {
1066 my $ident_dummy = $i + $ident_base;
1067 $ident_dummy .= $ident_appendix;
1068 my $map = {
1069 $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
1070 cs => undef,
1071 };
1072
1073 # diff lists and ...
1074 my $diff = getDifference([keys %$node], [keys %$map]);
1075 next if $#{$diff} == -1;
1076
1077 # ... build criteria including all columns
1078 my @crits;
1079 foreach my $property (@$diff) {
1080 next if !$property;
1081 my $value = $node->{$property};
1082 next if !$value;
1083 push @crits, "$property='" . quotesql($value) . "'";
1084 }
1085 my $crit = join ' AND ', @crits;
1086 print "p" if $self->{verbose};
1087 $self->_modifyNode($descent, 'update', $map, $crit);
1088 $i++;
1089 }
1090
1091 print "\n" if $self->{verbose};
1092
1093 if (!$i) {
1094 $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
1095 }
1096
1097 }
1098
1099 # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
1100 sub _otherSide {
1101 my $self = shift;
1102 my $descent = shift;
1103 return 'source' if $descent eq 'target';
1104 return 'target' if $descent eq 'source';
1105 return '';
1106 }
1107
1108 sub _erase_all {
1109 my $self = shift;
1110 my $descent = shift;
1111 #my $node = shift;
1112 my $node = $self->{meta}->{$descent}->{node};
1113 $self->{meta}->{$descent}->{storage}->eraseAll($node);
1114 }
1115
1116
1117 =pod
1118
1119
1120 =head1 DESCRIPTION
1121
1122 Data::Transfer::Sync is a module providing a generic synchronization process
1123 across arbitrary/multiple storages based on a ident/checksum mechanism.
1124 It sits on top of Data::Storage.
1125
1126
1127 =head1 REQUIREMENTS
1128
1129 For full functionality:
1130 Data::Storage
1131 Data::Transform
1132 Data::Compare
1133 ... and all their dependencies
1134
1135
1136 =head1 AUTHORS / COPYRIGHT
1137
1138 The Data::Storage module is Copyright (c) 2002 Andreas Motl.
1139 All rights reserved.
1140
1141 You may distribute it under the terms of either the GNU General Public
1142 License or the Artistic License, as specified in the Perl README file.
1143
1144
1145 =head1 SUPPORT / WARRANTY
1146
1147 Data::Storage is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1148
1149
1150
1151 =head1 BUGS
1152
1153 When in "import" mode for windows file - DBD::AutoCSV may hang.
1154 Hint: Maybe the source node contains an ident-, but no checksum-column?
1155
1156
1157 =head1 USER LEVEL ERRORS
1158
1159 =head4 Mapping
1160
1161 - - - - - - - - - - - - - - - - - - - - - - - - - -
1162 info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )critical: BizWorks::Process::Setup->startSync: Can't access mapping for node "Currency" - please check BizWorks::ResourceMapping.
1163 - - - - - - - - - - - - - - - - - - - - - - - - - -
1164 You have to create a sub for each node used in synchronization inside named Perl module. The name of this sub _must_ match
1165 the name of the node you want to sync. This sub holds mapping metadata to give the engine hints about how
1166 to access the otherwise generic nodes.
1167 - - - - - - - - - - - - - - - - - - - - - - - - - -
1168
1169
1170 =head4 DBD::AutoCSV's rulebase
1171
1172 - - - - - - - - - - - - - - - - - - - - - - - - - -
1173 info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )
1174 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1175
1176 Execution ERROR: Error while scanning: Missing first row or scanrule not applied. at C:/home/amo/develop/netfrag.org/nfo/perl/libs/DBD/CSV.p
1177 m line 165, <GEN9> line 1.
1178 called from C:/home/amo/develop/netfrag.org/nfo/perl/libs/Data/Storage/Handler/DBI.pm at 123.
1179
1180 DBI-Error: DBD::AutoCSV::st fetchrow_hashref failed: Attempt to fetch row from a Non-SELECT statement
1181 notice: Data::Transfer::Sync->syncNodes: No nodes to synchronize.
1182 - - - - - - - - - - - - - - - - - - - - - - - - - -
1183 DBD::AutoCSV contains a rulebase which is spooled down while attempting to guess the style of the csv file regarding
1184 parameters like newline (eol), column-seperation-character (sep_char), quoting character (quote_char).
1185 If this spool runs out of entries and no style could be resolved, DBD::CSV dies causing this "Execution ERROR" which
1186 results in a "DBI-Error" afterwards.
1187 - - - - - - - - - - - - - - - - - - - - - - - - - -
1188
1189
1190 =head4 Check structure of source node
1191
1192 - - - - - - - - - - - - - - - - - - - - - - - - - -
1193 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1194 critical: Data::Transfer::Sync->syncNodes: Can not synchronize: No ident found in source node, maybe try to "import" this node first.
1195 - - - - - - - - - - - - - - - - - - - - - - - - - -
1196 If lowlevel detection succeeds, but no other required informations are found, this message is issued.
1197 "Other informations" might be:
1198 - column-header-row completely missing
1199 - ident column is empty
1200 - - - - - - - - - - - - - - - - - - - - - - - - - -
1201
1202
1203 =head4 Modify structure of source node
1204
1205 - - - - - - - - - - - - - - - - - - - - - - - - - -
1206 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1207 info: Data::Transfer::Sync->_prepareNode_MetaProperties( descent source )
1208 warning: Data::Transfer::Sync->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter...
1209 SQL ERROR: Command 'ALTER' not recognized or not supported!
1210
1211 SQL ERROR: Command 'ALTER' not recognized or not supported!
1212 - - - - - - - - - - - - - - - - - - - - - - - - - -
1213 The Engine found a node which structure does not match the required. It tries to alter this automatically - only when doing "import" -
1214 but the DBD driver (in this case DBD::CSV) gets in the way croaking not to be able to do this.
1215 This could also appear if your database connection has insufficient rights to modify the database structure.
1216 DBD::CSV croaks because it doesn't implement the ALTER command, so please edit your columns manually.
1217 Hint: Add columns with the names of your "ident" and "checksum" property specifications.
1218 - - - - - - - - - - - - - - - - - - - - - - - - - -
1219
1220
1221 =head4 Load source node by ident
1222
1223 - - - - - - - - - - - - - - - - - - - - - - - - - -
1224 info: Data::Transfer::Sync->_prepareNode_DummyIdent( descent source )
1225 pcritical: Data::Transfer::Sync->_modifyNode failed: "source" node is empty.
1226 - - - - - - - - - - - - - - - - - - - - - - - - - -
1227 The source node could not be loaded. Maybe the ident is missing. Please check manually.
1228 Hint: Like above, the ident and/or checksum columns may be missing....
1229 - - - - - - - - - - - - - - - - - - - - - - - - - -
1230
1231
1232 =head1 TODO
1233
1234 - sub _resolveIdentProvider
1235 - wrap _doModifySource and _doTransferTarget around a core function which can change virtually any type of node
1236 - split this module up into Sync.pm, Sync/Core.pm, Sync/Compare.pm and Sync/Compare/Checksum.pm
1237 - introduce _compareNodes as a core method and wrap it around methods in Sync/Compare/Checksum.pm
1238 - introduce Sync/Compare/MyComparisonImplementation.pm
1239 - some generic deferring method - e.g. "$self->defer(action)" - to be able to accumulate a bunch of actions for later processing
1240 - this implies everything done is _really_ split up into generic actions - how else would we defer them???
1241 - example uses:
1242 - fetch whole checksum list from node
1243 - remember source ident retransmits
1244 - remember: this is convenient - and maybe / of course faster - but we'll loose "per-node-atomic" operations
1245 - feature: mechanism to implicit inject checksum property to nodes (alter table / modify schema)
1246 - expand statistics / keep track of:
1247 - touched/untouched nodes
1248 - full sync
1249 - just do a push and a pull for now but use stats for touched nodes in between to speed up things
1250 - introduce some new metadata flags for a synchronization partner which is (e.g.) of "source" or "target":
1251 - isNewNodePropagator
1252 - isWriteProtected
1253
1254
1255 =cut
1256
1257 1;

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed