/[cvs]/nfo/perl/libs/Data/Transfer/Sync.pm
ViewVC logotype

Contents of /nfo/perl/libs/Data/Transfer/Sync.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.2 - (show annotations)
Sun Dec 1 04:43:25 2002 UTC (21 years, 7 months ago) by joko
Branch: MAIN
Changes since 1.1: +77 -41 lines
+ mapping deatil entries may now be either an ARRAY or a HASH
+ erase flag is used now (for export-operations)
+ expressions to refer to values inside deep nested structures
- removed old mappingV2-code
+ cosmetics
+ sub _erase_all

1 ## $Id: Sync.pm,v 1.1 2002/11/29 04:45:50 joko Exp $
2 ##
3 ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
4 ##
5 ## See COPYRIGHT section in pod text below for usage and distribution rights.
6 ##
7 ## ----------------------------------------------------------------------------------------
8 ## $Log: Sync.pm,v $
9 ## Revision 1.1 2002/11/29 04:45:50 joko
10 ## + initial check in
11 ##
12 ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
13 ## + new
14 ## ----------------------------------------------------------------------------------------
15
16
17 package Data::Transfer::Sync;
18
19 use strict;
20 use warnings;
21
22 use Data::Dumper;
23 use misc::HashExt;
24 use libp qw( md5_base64 );
25 use libdb qw( quotesql hash2Sql );
26 use Data::Transform::Deep qw( hash2object refexpr2perlref );
27 use Data::Compare::Struct qw( getDifference isEmpty );
28
29 # get logger instance
30 my $logger = Log::Dispatch::Config->instance;
31
32
33 sub new {
34 my $invocant = shift;
35 my $class = ref($invocant) || $invocant;
36 my $self = { @_ };
37 $logger->debug( __PACKAGE__ . "->new(@_)" );
38 bless $self, $class;
39 $self->_init();
40 return $self;
41 }
42
43
44 sub _init {
45 my $self = shift;
46
47 # build new container if necessary
48 $self->{container} = Data::Storage::Container->new() if !$self->{container};
49
50 # add storages to container (optional)
51 foreach (keys %{$self->{storages}}) {
52 $self->{container}->addStorage($_, $self->{storages}->{$_});
53 }
54
55 # tag storages with id-authority and checksum-provider information
56 # TODO: better store tag inside metadata to hold bits together!
57 map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
58 map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
59 map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
60
61 }
62
63
64 # TODO: some feature to show off the progress of synchronization (cur/max * 100)
65 sub syncNodes {
66
67 my $self = shift;
68 my $args = shift;
69
70 # remember arguments through the whole processing
71 $self->{args} = $args;
72
73 $logger->debug( __PACKAGE__ . "->syncNodes: starting" );
74
75 # hash to hold and/or fill in metadata required for the processing
76 $self->{meta} = {};
77
78 # hash to sum up results
79 my $direction_arrow = '';
80
81 # detect synchronization method to determine which optical symbol (directed arrow) to use
82 if (lc $self->{args}->{direction} eq 'push') {
83 $direction_arrow = '->';
84 } elsif (lc $self->{args}->{direction} eq 'pull') {
85 $direction_arrow = '<-';
86 } elsif (lc $self->{args}->{direction} eq 'full') {
87 $direction_arrow = '<->';
88 } else {
89 }
90
91 # decompose identifiers for each partner
92 # TODO: take this list from already established/given metadata
93 foreach ('source', 'target') {
94
95 # get/set metadata for further processing
96
97 # Partner and Node (e.g.: "L:Country" or "R:countries.csv")
98 if (my $item = $self->{args}->{$_}) {
99 my @item = split(':', $item);
100 $self->{meta}->{$_}->{dbkey} = $item[0];
101 $self->{meta}->{$_}->{node} = $item[1];
102 }
103
104 # Filter
105 if (my $item_filter = $self->{args}->{$_ . '_filter'}) {
106 $self->{meta}->{$_}->{filter} = $item_filter;
107 }
108
109 # IdentProvider
110 if (my $item_ident = $self->{args}->{$_ . '_ident'}) {
111 my @item_ident = split(':', $item_ident);
112 $self->{meta}->{$_}->{IdentProvider} = { method => $item_ident[0], arg => $item_ident[1] };
113 }
114
115 # TODO: ChecksumProvider
116
117 # exclude properties/subnodes
118 if (my $item_exclude = $self->{args}->{$_ . '_exclude'}) {
119 $self->{meta}->{$_}->{subnodes_exclude} = $item_exclude;
120 }
121
122 # TypeProvider
123 if (my $item_type = $self->{args}->{$_ . '_type'}) {
124 my @item_type = split(':', $item_type);
125 $self->{meta}->{$_}->{TypeProvider} = { method => $item_type[0], arg => $item_type[1] };
126 }
127
128 # Callbacks - writers (will be triggered _before_ writing to target)
129 if (my $item_writers = $self->{args}->{$_ . '_callbacks_write'}) {
130 my $descent = $_; # this is important since the following code inside the map wants to use its own context variables
131 map { $self->{meta}->{$descent}->{Callback}->{write}->{$_}++; } @$item_writers;
132 }
133
134 # Callbacks - readers (will be triggered _after_ reading from source)
135 if (my $item_readers = $self->{args}->{$_ . '_callbacks_read'}) {
136 my $descent = $_;
137 map { $self->{meta}->{$descent}->{Callback}->{read}->{$_}++; } @$item_readers;
138 }
139
140 # resolve storage objects
141 #$self->{$_} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
142 # relink references to metainfo
143 $self->{meta}->{$_}->{storage} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
144 #print "iiiiisprov: ", Dumper($self->{meta}->{$_}->{storage}), "\n";
145 }
146
147 $logger->info( __PACKAGE__ . "->syncNodes: source=$self->{meta}->{source}->{dbkey}/$self->{meta}->{source}->{node} $direction_arrow target=$self->{meta}->{target}->{dbkey}/$self->{meta}->{target}->{node}" );
148
149 # build mapping
150 # incoming: and Array of node map entries (Array or Hash) - e.g.
151 # [ 'source:item_name' => 'target:class_val' ]
152 # { source => 'event->startDateTime', target => 'begindate' }
153 foreach (@{$self->{args}->{mapping}}) {
154 if (ref $_ eq 'ARRAY') {
155 my @entry1 = split(':', $_->[0]);
156 my @entry2 = split(':', $_->[1]);
157 my $descent = [];
158 my $node = [];
159 $descent->[0] = $entry1[0];
160 $descent->[1] = $entry2[0];
161 $node->[0] = $entry1[1];
162 $node->[1] = $entry2[1];
163 push @{$self->{meta}->{$descent->[0]}->{childnodes}}, $node->[0];
164 push @{$self->{meta}->{$descent->[1]}->{childnodes}}, $node->[1];
165 } elsif (ref $_ eq 'HASH') {
166 foreach my $entry_key (keys %$_) {
167 my $entry_val = $_->{$entry_key};
168 push @{$self->{meta}->{$entry_key}->{childnodes}}, $entry_val;
169 }
170 }
171
172 }
173
174 # check partners/nodes: does partner exist / is node available?
175 foreach my $partner (keys %{$self->{meta}}) {
176 next if $self->{meta}->{$partner}->{storage}->{locator}->{type} eq 'DBI'; # for DBD::CSV - re-enable for others
177 my $node = $self->{meta}->{$partner}->{node};
178 if (!$self->{meta}->{$partner}->{storage}->existsChildNode($node)) {
179 $logger->critical( __PACKAGE__ . "->syncNodes: Could not reach \"$node\" at \"$partner\"." );
180 return;
181 }
182 }
183
184 # TODO:
185 # + if action == PUSH: start processing
186 # -+ if action == PULL: swap metadata and start processing
187 # - if action == FULL: start processing, then swap metadata and (re-)start processing
188
189 #print Dumper($self->{args});
190
191 # manipulate metainfo according to direction of synchronization
192 if (lc $self->{args}->{direction} eq 'push') {
193 # just do it ...
194 } elsif (lc $self->{args}->{direction} eq 'pull') {
195 #print "=======SWAP", "\n";
196 # swap
197 ($self->{meta}->{source}, $self->{meta}->{target}) =
198 ($self->{meta}->{target}, $self->{meta}->{source});
199 } elsif (lc $self->{args}->{direction} eq 'full') {
200 } else {
201 }
202
203 # import flag means: prepare the source node to be syncable
204 # this is useful if there are e.g. no "ident" or "checksum" columns yet inside a DBI like (row-based) storage
205 if ($self->{args}->{import}) {
206 $self->_prepareNode_MetaProperties('source');
207 $self->_prepareNode_DummyIdent('source');
208 #return;
209 #$self->_erase_all($opts->{source_node});
210 }
211
212 # erase flag means: erase the target
213 #if ($opts->{erase}) {
214 if ($self->{args}->{erase}) {
215 # TODO: move this method to the scope of the synchronization core and wrap it around different handlers
216 #print "ERASE", "\n";
217 $self->_erase_all('target');
218 }
219
220 $self->_syncNodes();
221
222 }
223
224
225 # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="§/%???
226 sub _syncNodes {
227
228 my $self = shift;
229
230 my $tc = OneLineDumpHash->new( {} );
231 my $results;
232
233 # set of objects is already in $self->{args}
234 # TODO: make independent of the terminology "object..."
235 $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
236
237 # apply filter
238 if (my $filter = $self->{meta}->{source}->{filter}) {
239 #print Dumper($filter);
240 #exit;
241 $results ||= $self->_getNodeList('source', $filter);
242 }
243
244 # get reference to node list from convenient method provided by corehandle
245 #$results ||= $self->{source}->getListUnfiltered($self->{meta}->{source}->{node});
246 #$results ||= $self->{meta}->{source}->{storage}->getListUnfiltered($self->{meta}->{source}->{node});
247 $results ||= $self->_getNodeList('source');
248
249 # checkpoint: do we actually have a list to iterate through?
250 if (!$results || !@{$results}) {
251 $logger->notice( __PACKAGE__ . "->syncNodes: No nodes to synchronize." );
252 return;
253 }
254
255 # dereference
256 my @results = @{$results};
257
258 # iterate through set
259 foreach my $source_node_real (@results) {
260
261 $tc->{total}++;
262
263 #print "======================== iter", "\n";
264
265 # clone object (in case we have to modify it here)
266 # TODO:
267 # - is a "deep_copy" needed here if occouring modifications take place?
268 # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
269 # - after all, just take care for now that this object doesn't get updated!
270 my $source_node = $source_node_real;
271
272 # modify entry - handle new style callbacks (the readers)
273 #print Dumper($source_node);
274 #exit;
275
276 my $descent = 'source';
277
278 # handle callbacks right now while scanning them (asymmetric to the writers)
279 my $map_callbacks = {};
280 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
281
282 my $error = 0;
283
284 foreach my $node (keys %{$callbacks->{read}}) {
285
286 my $object = $source_node;
287 my $value; # = $source_node->{$node};
288
289 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
290 my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
291 my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
292 #print $evalstring, "\n"; exit;
293 my $cb_result = eval($evalstring);
294 if ($@) {
295 die $@;
296 $error = 1;
297 print $@, "\n";
298 }
299 # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
300
301 $source_node->{$node} = $cb_result;
302
303 }
304
305 }
306
307 #print Dumper($source_node);
308
309 # exclude defined fields (simply delete from object)
310 map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
311
312 # here we accumulate information about the status of the current node (payload/row/object/item/entry)
313 $self->{node} = {};
314 $self->{node}->{source}->{payload} = $source_node;
315
316 #print "res - ident", "\n";
317
318 # determine ident of entry
319 my $identOK = $self->_resolveNodeIdent('source');
320 #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
321 if (!$identOK) {
322 $logger->critical( __PACKAGE__ . "->syncNodes: Can not synchronize: No ident found in source node, maybe try to \"import\" this node first." );
323 return;
324 }
325
326 #print "statload", "\n";
327 #print "ident: ", $self->{node}->{source}->{ident}, "\n";
328
329 my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
330
331 # mark node as new either if there's no ident or if stat/load failed
332 if (!$statOK) {
333 $self->{node}->{status}->{new} = 1;
334 print "n" if $self->{verbose};
335 }
336
337 #print "checksum", "\n";
338
339 # determine status of entry by synchronization method
340 if ( (lc $self->{args}->{method} eq 'checksum') ) {
341 #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
342 #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
343
344 # TODO:
345 # is this really worth a "critical"???
346 # no - it should just be a debug appendix i believe
347
348 #print "readcs", "\n";
349
350 # calculate checksum of source node
351 #$self->_calcChecksum('source');
352 if (!$self->_readChecksum('source')) {
353 $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
354 $tc->{skip}++;
355 print "s" if $self->{verbose};
356 next;
357 }
358
359 # get checksum from synchronization target
360 $self->_readChecksum('target');
361 #if (!$self->_readChecksum('target')) {
362 # $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
363 # next;
364 #}
365
366 # pre flight check: do we actually have a checksum provided?
367 #if (!$self->{node}->{source}->{checksum}) {
368 # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
369 # return;
370 #}
371
372 # determine if entry is "new" or "dirty"
373 # after all, this seems to be the point where the hammer falls.....
374 print "c" if $self->{verbose};
375 $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
376 if (!$self->{node}->{status}->{new}) {
377 $self->{node}->{status}->{dirty} =
378 $self->{node}->{status}->{new} ||
379 (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
380 ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
381 $self->{args}->{force};
382 }
383
384 }
385
386 # first reaction on entry-status: continue with next entry if the current is already "in sync"
387 if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
388 $tc->{in_sync}++;
389 next;
390 }
391
392 # build map to actually transfer the data from source to target
393 $self->_buildMap();
394
395
396 #print Dumper($self->{node}); exit;
397
398 #print "attempt", "\n";
399
400 # additional (new) checks for feature "write-protection"
401 if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
402 $tc->{attempt_transfer}++;
403 print "\n" if $self->{verbose};
404 $logger->notice( __PACKAGE__ . "->syncNodes: Target is write-protected. Will not insert or modify node. " .
405 "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
406 print "\n" if $self->{verbose};
407 $tc->{skip}++;
408 next;
409 }
410
411 # transfer contents of map to target
412 if ($self->{node}->{status}->{new}) {
413 $tc->{attempt_new}++;
414 $self->_doTransferToTarget('insert');
415 # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
416 $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
417 $self->_readChecksum('target');
418
419 } elsif ($self->{node}->{status}->{dirty}) {
420 $tc->{attempt_modify}++;
421 # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
422 $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
423 $self->_doTransferToTarget('update');
424 $self->_readChecksum('target');
425 }
426
427 if ($self->{node}->{status}->{ok}) {
428 $tc->{ok}++;
429 print "t" if $self->{verbose};
430 }
431
432 if ($self->{node}->{status}->{error}) {
433 $tc->{error}++;
434 push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
435 print "e" if $self->{verbose};
436 }
437
438 # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
439 # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
440 if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{storage}->{isIdentAuthority}) {
441 #print Dumper($self->{meta});
442 #print Dumper($self->{node});
443 #exit;
444 $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
445 print "r" if $self->{verbose};
446 }
447
448 print ":" if $self->{verbose};
449
450 }
451
452 print "\n" if $self->{verbose};
453
454 # build user-message from some stats
455 my $msg = "stats: $tc";
456
457 if ($tc->{error_per_row}) {
458 $msg .= "\n";
459 $msg .= "errors from \"error_per_row\":" . "\n";
460 $msg .= Dumper($tc->{error_per_row});
461 }
462
463 # todo!!!
464 #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
465 $logger->info( __PACKAGE__ . "->syncNodes: $msg" );
466
467 return $tc;
468
469 }
470
471
472 sub _dumpCompact {
473 my $self = shift;
474
475 #my $vars = \@_;
476 my @data = ();
477
478 my $count = 0;
479 foreach (@_) {
480 my $item = {};
481 foreach my $key (keys %$_) {
482 my $val = $_->{$key};
483 if (ref $val eq 'Set::Object') {
484 #print "========================= SET", "\n";
485 #print Dumper($val);
486 #print Dumper($val->members());
487 #$val = $val->members();
488 #$vars->[$count]->{$key} = $val->members() if $val->can("members");
489 #$item->{$key} = $val->members() if $val->can("members");
490 $item->{$key} = $val->members();
491 #print Dumper($vars->[$count]->{$key});
492 } else {
493 $item->{$key} = $val;
494 }
495 }
496 push @data, $item;
497 $count++;
498 }
499
500 #print "Dump:", "\n";
501 #print Dumper(@data);
502
503 $Data::Dumper::Indent = 0;
504 my $result = Dumper(@data);
505 $Data::Dumper::Indent = 2;
506 return $result;
507 }
508
509
510 sub _calcChecksum {
511
512 my $self = shift;
513 my $descent = shift;
514 my $specifier = shift;
515
516 # calculate checksum for current object
517 my $ident = $self->{node}->{$descent}->{ident};
518
519 # build dump of this node
520 my $payload = $self->{node}->{$descent}->{payload};
521 #my $dump = $ident . "\n" . $item->quickdump();
522 #my $dump = $ident . "\n" . Dumper($item);
523 my $dump = $ident . "\n" . $self->_dumpCompact($payload);
524
525 # TODO: $logger->dump( ... );
526 #$logger->debug( __PACKAGE__ . ": " . $dump );
527 #$logger->dump( __PACKAGE__ . ": " . $dump );
528
529 # calculate checksum from dump
530 # md5-based fingerprint, base64 encoded (from Digest::MD5)
531 #my $checksum_cur = md5_base64($objdump) . '==';
532 # 32-bit integer "hash" value (maybe faster?) (from DBI)
533 $self->{node}->{$descent}->{checksum} = DBI::hash($dump, 1);
534
535 # signal good
536 return 1;
537
538 }
539
540
541 sub _readChecksum {
542 my $self = shift;
543
544 my $descent = shift;
545
546 #print "getcheck:", "\n"; print Dumper($self->{node}->{$descent});
547
548 if (!$self->{node}->{$descent}) {
549 # signal checksum bad
550 return;
551 }
552
553 # get checksum for current entry
554 # TODO: don't have the checksum column/property hardcoded as "cs" here, make this configurable somehow
555
556 if ($self->{meta}->{$descent}->{storage}->{isChecksumAuthority}) {
557 #$self->{node}->{$descent}->{checksum} = $entry->{cs};
558 #$self->{node}->{$descent}->{checksum} = $self->_calcChecksum($descent); # $entry->{cs};
559 #print "descent: $descent", "\n";
560 $self->_calcChecksum($descent);
561 #print "checksum: ", $self->{node}->{$descent}->{checksum}, "\n";
562 } else {
563
564 #$self->{node}->{$descent}->{checksum} = $entry->{cs};
565 $self->{node}->{$descent}->{checksum} = $self->{node}->{$descent}->{payload}->{cs};
566 }
567
568 # signal checksum good
569 return 1;
570
571 }
572
573
574 sub _buildMap {
575
576 my $self = shift;
577
578 # field-structure for building sql
579 # mapping of sql-fieldnames to object-attributes
580 $self->{node}->{map} = {};
581
582 # manually set ...
583 # ... object-id
584 $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}} = $self->{node}->{source}->{ident};
585 # ... checksum
586 $self->{node}->{map}->{cs} = $self->{node}->{source}->{checksum};
587
588 #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
589
590 # for transferring flat structures via simple (1:1) mapping
591 # TODO: diff per property / property value
592
593 if ($self->{args}->{mapping}) {
594 # apply mapping from $self->{args}->{mapping} to $self->{node}->{map}
595 #foreach my $key (@{$self->{meta}->{source}->{childnodes}}) {
596 my @childnodes = @{$self->{meta}->{source}->{childnodes}};
597 for (my $mapidx = 0; $mapidx <= $#childnodes; $mapidx++) {
598 #my $map_right = $self->{args}->{mapping}->{$key};
599
600 $self->{node}->{source}->{propcache} = {};
601 $self->{node}->{target}->{propcache} = {};
602
603 # get property name
604 $self->{node}->{source}->{propcache}->{property} = $self->{meta}->{source}->{childnodes}->[$mapidx];
605 $self->{node}->{target}->{propcache}->{property} = $self->{meta}->{target}->{childnodes}->[$mapidx];
606 #print "map: $map_right", "\n";
607
608 # get property value
609 my $value;
610
611 # detect for callback - old style - (maybe the better???)
612 if (ref($self->{node}->{target}->{map}) eq 'CODE') {
613 #$value = &$map_right($objClone);
614 } else {
615 # plain (scalar?) value
616 #$value = $objClone->{$map_right};
617 $self->{node}->{source}->{propcache}->{value} = $self->{node}->{source}->{payload}->{$self->{node}->{source}->{propcache}->{property}};
618 }
619 #$self->{node}->{map}->{$key} = $value;
620
621 # detect expression
622 # for transferring deeply nested structures described by expressions
623 #print "val: $self->{node}->{source}->{propcache}->{value}", "\n";
624 if ($self->{node}->{source}->{propcache}->{property} =~ s/^expr://) {
625
626 # create an anonymous sub to act as callback target dispatcher
627 my $cb_dispatcher = sub {
628 #print "=============== CALLBACK DISPATCHER", "\n";
629 #print "ident: ", $self->{node}->{source}->{ident}, "\n";
630 #return $self->{node}->{source}->{ident};
631
632 };
633
634
635 #print Dumper($self->{node});
636
637 # build callback map for helper function
638 #my $cbmap = { $self->{meta}->{source}->{IdentProvider}->{arg} => $cb_dispatcher };
639 my $cbmap = {};
640 my $value = refexpr2perlref($self->{node}->{source}->{payload}, $self->{node}->{source}->{propcache}->{property}, $cbmap);
641 $self->{node}->{source}->{propcache}->{value} = $value;
642 }
643
644 # encode values dependent on type of underlying storage here - expand cases...
645 my $storage_type = $self->{meta}->{target}->{storage}->{locator}->{type};
646 if ($storage_type eq 'DBI') {
647 # ...for sql
648 $self->{node}->{source}->{propcache}->{value} = quotesql($self->{node}->{source}->{propcache}->{value});
649 }
650 elsif ($storage_type eq 'Tangram') {
651 # iso? utf8 already possible?
652
653 } elsif ($storage_type eq 'LDAP') {
654 # TODO: encode utf8 here?
655 }
656
657 # store value to transfer map
658 $self->{node}->{map}->{$self->{node}->{target}->{propcache}->{property}} = $self->{node}->{source}->{propcache}->{value};
659
660 }
661 }
662
663
664 # TODO: $logger->dump( ... );
665 #$logger->debug( "sqlmap:" . "\n" . Dumper($self->{node}->{map}) );
666 #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
667 #print "entrystatus: ", Dumper($self->{node}), "\n";
668
669 }
670
671 sub _resolveNodeIdent {
672 my $self = shift;
673 my $descent = shift;
674
675 #print Dumper($self->{node}->{$descent});
676
677 # get to the payload
678 #my $item = $specifier->{item};
679 my $payload = $self->{node}->{$descent}->{payload};
680
681 # resolve method to get to the id of the given item
682 # we use global metadata and the given descent for this task
683 #my $ident = $self->{$descent}->id($item);
684 #my $ident = $self->{meta}->{$descent}->{storage}->id($item);
685
686 my $ident;
687 my $provider_method = $self->{meta}->{$descent}->{IdentProvider}->{method};
688 my $provider_arg = $self->{meta}->{$descent}->{IdentProvider}->{arg};
689
690 # resolve to ident
691 if ($provider_method eq 'property') {
692 $ident = $payload->{$provider_arg};
693
694 } elsif ($provider_method eq 'storage_method') {
695 #$ident = $self->{meta}->{$descent}->{storage}->id($item);
696 $ident = $self->{meta}->{$descent}->{storage}->$provider_arg($payload);
697 }
698
699 $self->{node}->{$descent}->{ident} = $ident;
700
701 return 1 if $ident;
702
703 }
704
705
706 sub _modifyNode {
707 my $self = shift;
708 my $descent = shift;
709 my $action = shift;
710 my $map = shift;
711 my $crit = shift;
712
713 # map for new style callbacks
714 my $map_callbacks = {};
715
716 # checks go first!
717
718 # TODO: this should be reviewed first - before extending ;-)
719 # TODO: this should be extended:
720 # count this cases inside the caller to this sub and provide a better overall message
721 # if this counts still zero in the end:
722 # "No nodes have been touched for modify: Do you have column-headers in your csv file?"
723 if (not defined $self->{node}) {
724 #$logger->critical( __PACKAGE__ . "->_modifyNode failed: \"$descent\" node is empty." );
725 #return;
726 }
727
728 # transfer callback nodes from value map to callback map - handle them afterwards! - (new style callbacks)
729 if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
730 foreach my $callback (keys %{$callbacks->{write}}) {
731 $map_callbacks->{write}->{$callback} = $map->{$callback};
732 delete $map->{$callback};
733 }
734 }
735
736
737 # DBI speaks SQL
738 if ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'DBI') {
739
740 #print Dumper($self->{node});
741 my $sql_main;
742 # translate map to sql
743 #print $action, "\n"; exit;
744 #print $self->{meta}->{$descent}->{node}, "\n"; exit;
745 #print "action:";
746 #print $action, "\n";
747 #$action = "anc";
748 #print "yai", "\n";
749 if (lc($action) eq 'insert') {
750 $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_INSERT');
751 } elsif (lc $action eq 'update') {
752 $crit ||= "$self->{meta}->{$descent}->{IdentProvider}->{arg}='$self->{node}->{$descent}->{ident}'";
753 $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_UPDATE', $crit);
754 }
755
756 #print "sql: ", $sql_main, "\n";
757 #exit;
758
759 # transfer data
760 my $sqlHandle = $self->{meta}->{$descent}->{storage}->sendCommand($sql_main);
761
762 # handle errors
763 if ($sqlHandle->err) {
764 #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
765 $self->{node}->{status}->{error} = {
766 statement => $sql_main,
767 state => $sqlHandle->state,
768 err => $sqlHandle->err,
769 errstr => $sqlHandle->errstr,
770 };
771 } else {
772 $self->{node}->{status}->{ok} = 1;
773 }
774
775 # Tangram does it the oo-way (naturally)
776 } elsif ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'Tangram') {
777 my $sql_main;
778 my $object;
779
780 # determine classname
781 my $classname = $self->{meta}->{$descent}->{node};
782
783 # properties to exclude
784 my @exclude = @{$self->{meta}->{$descent}->{subnodes_exclude}};
785
786
787 if (my $identProvider = $self->{meta}->{$descent}->{IdentProvider}) {
788 push @exclude, $identProvider->{arg};
789 }
790
791 # new feature:
792 # - check TypeProvider metadata property from other side
793 # - use argument (arg) inside as a classname for object creation on this side
794 #my $otherSide = $self->_otherSide($descent);
795 if (my $typeProvider = $self->{meta}->{$descent}->{TypeProvider}) {
796 #print Dumper($map);
797 $classname = $map->{$typeProvider->{arg}};
798 # remove nodes from map also (push nodes to "subnodes_exclude" list)
799 push @exclude, $typeProvider->{arg};
800 }
801
802 # exclude banned properties (remove from map)
803 #map { delete $self->{node}->{map}->{$_} } @{$self->{args}->{exclude}};
804 map { delete $map->{$_} } @exclude;
805
806 # list of properties
807 my @props = keys %{$map};
808
809 # transfer data
810 if (lc $action eq 'insert') {
811
812 # build array to initialize object
813 #my @initarray = ();
814 #map { push @initarray, $_, undef; } @props;
815
816 # make the object persistent in four steps:
817 # - raw create (perl / class tangram scope)
818 # - engine insert (tangram scope) ... this establishes inheritance - don't try to fill in inherited properties before!
819 # - raw fill-in from hash (perl scope)
820 # - engine update (tangram scope) ... this updates all properties just filled in
821
822 # create new object ...
823 #my $object = $classname->new( @initarray );
824 $object = $classname->new();
825
826 # ... pass to orm ...
827 $self->{meta}->{$descent}->{storage}->insert($object);
828
829 # ... and initialize with empty (undef'd) properties.
830 #print Dumper(@props);
831 map { $object->{$_} = undef; } @props;
832
833 # mix in values ...
834 hash2object($object, $map);
835
836 # ... and re-update@orm.
837 $self->{meta}->{$descent}->{storage}->update($object);
838
839 # asymmetry: get ident after insert
840 # TODO:
841 # - just do this if it is an IdentAuthority
842 # - use IdentProvider metadata here
843 $self->{node}->{$descent}->{ident} = $self->{meta}->{$descent}->{storage}->id($object);
844
845
846 } elsif (lc $action eq 'update') {
847
848 # get fresh object from orm first
849 $object = $self->{meta}->{$descent}->{storage}->load($self->{node}->{$descent}->{ident});
850
851 #print Dumper($self->{node});
852
853 # mix in values
854 #print Dumper($object);
855 hash2object($object, $map);
856 #print Dumper($object);
857 #exit;
858 $self->{meta}->{$descent}->{storage}->update($object);
859 }
860
861 my $error = 0;
862
863 # handle new style callbacks - this is a HACK - do this without an eval!
864 #print Dumper($map);
865 #print "cb: ", Dumper($self->{meta}->{$descent}->{Callback});
866 #print Dumper($map_callbacks);
867 foreach my $node (keys %{$map_callbacks->{write}}) {
868 #print Dumper($node);
869 my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_write';
870 my $evalstring = $perl_callback . '( { object => $object, value => $map_callbacks->{write}->{$node}, storage => $self->{meta}->{$descent}->{storage} } );';
871 #print $evalstring, "\n"; exit;
872 eval($evalstring);
873 if ($@) {
874 $error = 1;
875 print $@, "\n";
876 }
877
878 #print "after eval", "\n";
879
880 if (!$error) {
881 # re-update@orm
882 $self->{meta}->{$descent}->{storage}->update($object);
883 }
884 }
885
886 # handle errors
887 if ($error) {
888 #print "error", "\n";
889 =pod
890 my $sqlHandle;
891 #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
892 $self->{node}->{status}->{error} = {
893 statement => $sql_main,
894 state => $sqlHandle->state,
895 err => $sqlHandle->err,
896 errstr => $sqlHandle->errstr,
897 };
898 =cut
899 # rollback....
900 #print "rollback", "\n";
901 $self->{meta}->{$descent}->{storage}->erase($object);
902 #print "after rollback", "\n";
903 } else {
904 $self->{node}->{status}->{ok} = 1;
905 }
906
907
908 }
909
910 }
911
912 # TODO:
913 # this should be split up into...
914 # - a "_statNode" (should just touch the node to check for existance)
915 # - a "_loadNode" (should load node completely)
916 # - maybe additionally a "loadNodeProperty" (may specify properties to load)
917 # - introduce $self->{nodecache} for this purpose
918 # TODO:
919 # should we:
920 # - not pass ident in here but resolve it via "$descent"?
921 # - refactor this and stuff it with additional debug/error message
922 # - this = the way the implicit load mechanism works
923 sub _statloadNode {
924
925 my $self = shift;
926 my $descent = shift;
927 my $ident = shift;
928 my $force = shift;
929
930 # fetch entry to retrieve checksum from
931 # was:
932 if (!$self->{node}->{$descent} || $force) {
933 # is:
934 #if (!$self->{node}->{$descent}->{item} || $force) {
935
936 if (!$ident) {
937 #print "\n", "Attempt to fetch entry implicitely by ident failed: no ident given! This may result in an insert if no write-protection is in the way.", "\n";
938 return;
939 }
940
941 my $result = $self->{meta}->{$descent}->{storage}->sendQuery({
942 node => $self->{meta}->{$descent}->{node},
943 subnodes => [qw( cs )],
944 criterias => [
945 { key => $self->{meta}->{$descent}->{IdentProvider}->{arg},
946 op => 'eq',
947 val => $ident },
948 ]
949 });
950
951 my $entry = $result->getNextEntry();
952 my $status = $result->getStatus();
953
954 # TODO: enhance error handling (store inside tc)
955 #if (!$row) {
956 # print "\n", "row error", "\n";
957 # next;
958 #}
959 if (($status && $status->{err}) || !$entry) {
960 #$logger->critical( __PACKAGE__ . "->_loadNode (ident=\"$ident\") failed" );
961 return;
962 }
963 # was:
964 # $self->{node}->{$descent}->{ident} = $ident;
965 # is:
966 # TODO: re-resolve ident from entry via metadata "IdentProvider"
967 $self->{node}->{$descent}->{ident} = $ident;
968 $self->{node}->{$descent}->{payload} = $entry;
969 }
970
971 return 1;
972
973 }
974
975 sub _doTransferToTarget {
976 my $self = shift;
977 my $action = shift;
978 $self->_modifyNode('target', $action, $self->{node}->{map});
979 }
980
981 sub _doModifySource_IdentChecksum {
982 my $self = shift;
983 my $ident_new = shift;
984 # this changes an old node to a new one including ident and checksum
985 # TODO:
986 # - eventually introduce an external resource to store this data to
987 # - we won't have to "re"-modify the source node here
988 my $map = {
989 $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
990 cs => $self->{node}->{target}->{checksum},
991 };
992 #print Dumper($map);
993 #print Dumper($self->{node});
994 #exit;
995 $self->_modifyNode('source', 'update', $map);
996 }
997
998
999 # this is a shortcut method
1000 # ... let's try to avoid _any_ redundant code in here (ok... - at the cost of method lookups...)
1001 sub _getNodeList {
1002 my $self = shift;
1003 my $descent = shift;
1004 my $filter = shift;
1005 return $self->{meta}->{$descent}->{storage}->getListFiltered($self->{meta}->{$descent}->{node}, $filter);
1006 }
1007
1008
1009 sub _prepareNode_MetaProperties {
1010 my $self = shift;
1011 my $descent = shift;
1012
1013 $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
1014
1015 # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
1016 my $list = $self->_getNodeList($descent);
1017
1018 # get first node
1019 my $firstnode = $list->[0];
1020
1021 # check if node contains meta properties/nodes
1022 # TODO: "cs" is hardcoded here!
1023 my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
1024 my @found = keys %$firstnode;
1025 #my @diff = getDifference(\@found, \@required);
1026 my $diff = getDifference(\@required, \@found);
1027 #print Dumper(@found);
1028 #print Dumper(@required);
1029 #print Dumper(@diff);
1030 #if (!$#diff || $#diff == -1) {
1031 if (isEmpty($diff)) {
1032 $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
1033 foreach (@required) {
1034 my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
1035 #print "sql: $sql", "\n";
1036 my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
1037 #print Dumper($res->getStatus());
1038 }
1039 }
1040
1041 }
1042
1043 sub _prepareNode_DummyIdent {
1044 my $self = shift;
1045 my $descent = shift;
1046
1047 $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
1048
1049 my $list = $self->_getNodeList($descent);
1050 #print Dumper($list);
1051 my $i = 0;
1052 my $ident_base = 5678983;
1053 my $ident_appendix = '0001';
1054 foreach my $node (@$list) {
1055 my $ident_dummy = $i + $ident_base;
1056 $ident_dummy .= $ident_appendix;
1057 my $map = {
1058 $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
1059 cs => undef,
1060 };
1061
1062 # diff lists and ...
1063 my $diff = getDifference([keys %$node], [keys %$map]);
1064 next if $#{$diff} == -1;
1065
1066 # ... build criteria including all columns
1067 my @crits;
1068 foreach my $property (@$diff) {
1069 next if !$property;
1070 my $value = $node->{$property};
1071 next if !$value;
1072 push @crits, "$property='" . quotesql($value) . "'";
1073 }
1074 my $crit = join ' AND ', @crits;
1075 print "p" if $self->{verbose};
1076 $self->_modifyNode($descent, 'update', $map, $crit);
1077 $i++;
1078 }
1079
1080 print "\n" if $self->{verbose};
1081
1082 if (!$i) {
1083 $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
1084 }
1085
1086 }
1087
1088 # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
1089 sub _otherSide {
1090 my $self = shift;
1091 my $descent = shift;
1092 return 'source' if $descent eq 'target';
1093 return 'target' if $descent eq 'source';
1094 return '';
1095 }
1096
1097 sub _erase_all {
1098 my $self = shift;
1099 my $descent = shift;
1100 #my $node = shift;
1101 my $node = $self->{meta}->{$descent}->{node};
1102 $self->{meta}->{$descent}->{storage}->eraseAll($node);
1103 }
1104
1105
1106 =pod
1107
1108
1109 =head1 DESCRIPTION
1110
1111 Data::Transfer::Sync is a module providing a generic synchronization process
1112 across arbitrary/multiple storages based on a ident/checksum mechanism.
1113 It sits on top of Data::Storage.
1114
1115
1116 =head1 REQUIREMENTS
1117
1118 For full functionality:
1119 Data::Storage
1120 Data::Transform
1121 Data::Compare
1122 ... and all their dependencies
1123
1124
1125 =head1 AUTHORS / COPYRIGHT
1126
1127 The Data::Storage module is Copyright (c) 2002 Andreas Motl.
1128 All rights reserved.
1129
1130 You may distribute it under the terms of either the GNU General Public
1131 License or the Artistic License, as specified in the Perl README file.
1132
1133
1134 =head1 SUPPORT / WARRANTY
1135
1136 Data::Storage is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1137
1138
1139
1140 =head1 BUGS
1141
1142 When in "import" mode for windows file - DBD::AutoCSV may hang.
1143 Hint: Maybe the source node contains an ident-, but no checksum-column?
1144
1145
1146 =head1 USER LEVEL ERRORS
1147
1148 =head4 Mapping
1149
1150 - - - - - - - - - - - - - - - - - - - - - - - - - -
1151 info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )critical: BizWorks::Process::Setup->startSync: Can't access mapping for node "Currency" - please check BizWorks::ResourceMapping.
1152 - - - - - - - - - - - - - - - - - - - - - - - - - -
1153 You have to create a sub for each node used in synchronization inside named Perl module. The name of this sub _must_ match
1154 the name of the node you want to sync. This sub holds mapping metadata to give the engine hints about how
1155 to access the otherwise generic nodes.
1156 - - - - - - - - - - - - - - - - - - - - - - - - - -
1157
1158
1159 =head4 DBD::AutoCSV's rulebase
1160
1161 - - - - - - - - - - - - - - - - - - - - - - - - - -
1162 info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )
1163 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1164
1165 Execution ERROR: Error while scanning: Missing first row or scanrule not applied. at C:/home/amo/develop/netfrag.org/nfo/perl/libs/DBD/CSV.p
1166 m line 165, <GEN9> line 1.
1167 called from C:/home/amo/develop/netfrag.org/nfo/perl/libs/Data/Storage/Handler/DBI.pm at 123.
1168
1169 DBI-Error: DBD::AutoCSV::st fetchrow_hashref failed: Attempt to fetch row from a Non-SELECT statement
1170 notice: Data::Transfer::Sync->syncNodes: No nodes to synchronize.
1171 - - - - - - - - - - - - - - - - - - - - - - - - - -
1172 DBD::AutoCSV contains a rulebase which is spooled down while attempting to guess the style of the csv file regarding
1173 parameters like newline (eol), column-seperation-character (sep_char), quoting character (quote_char).
1174 If this spool runs out of entries and no style could be resolved, DBD::CSV dies causing this "Execution ERROR" which
1175 results in a "DBI-Error" afterwards.
1176 - - - - - - - - - - - - - - - - - - - - - - - - - -
1177
1178
1179 =head4 Check structure of source node
1180
1181 - - - - - - - - - - - - - - - - - - - - - - - - - -
1182 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1183 critical: Data::Transfer::Sync->syncNodes: Can not synchronize: No ident found in source node, maybe try to "import" this node first.
1184 - - - - - - - - - - - - - - - - - - - - - - - - - -
1185 If lowlevel detection succeeds, but no other required informations are found, this message is issued.
1186 "Other informations" might be:
1187 - column-header-row completely missing
1188 - ident column is empty
1189 - - - - - - - - - - - - - - - - - - - - - - - - - -
1190
1191
1192 =head4 Modify structure of source node
1193
1194 - - - - - - - - - - - - - - - - - - - - - - - - - -
1195 info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1196 info: Data::Transfer::Sync->_prepareNode_MetaProperties( descent source )
1197 warning: Data::Transfer::Sync->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter...
1198 SQL ERROR: Command 'ALTER' not recognized or not supported!
1199
1200 SQL ERROR: Command 'ALTER' not recognized or not supported!
1201 - - - - - - - - - - - - - - - - - - - - - - - - - -
1202 The Engine found a node which structure does not match the required. It tries to alter this automatically - only when doing "import" -
1203 but the DBD driver (in this case DBD::CSV) gets in the way croaking not to be able to do this.
1204 This could also appear if your database connection has insufficient rights to modify the database structure.
1205 DBD::CSV croaks because it doesn't implement the ALTER command, so please edit your columns manually.
1206 Hint: Add columns with the names of your "ident" and "checksum" property specifications.
1207 - - - - - - - - - - - - - - - - - - - - - - - - - -
1208
1209
1210 =head4 Load source node by ident
1211
1212 - - - - - - - - - - - - - - - - - - - - - - - - - -
1213 info: Data::Transfer::Sync->_prepareNode_DummyIdent( descent source )
1214 pcritical: Data::Transfer::Sync->_modifyNode failed: "source" node is empty.
1215 - - - - - - - - - - - - - - - - - - - - - - - - - -
1216 The source node could not be loaded. Maybe the ident is missing. Please check manually.
1217 Hint: Like above, the ident and/or checksum columns may be missing....
1218 - - - - - - - - - - - - - - - - - - - - - - - - - -
1219
1220
1221 =head1 TODO
1222
1223 - sub _resolveIdentProvider
1224 - wrap _doModifySource and _doTransferTarget around a core function which can change virtually any type of node
1225 - split this module up into Sync.pm, Sync/Core.pm, Sync/Compare.pm and Sync/Compare/Checksum.pm
1226 - introduce _compareNodes as a core method and wrap it around methods in Sync/Compare/Checksum.pm
1227 - introduce Sync/Compare/MyComparisonImplementation.pm
1228 - some generic deferring method - e.g. "$self->defer(action)" - to be able to accumulate a bunch of actions for later processing
1229 - this implies everything done is _really_ split up into generic actions - how else would we defer them???
1230 - example uses:
1231 - fetch whole checksum list from node
1232 - remember source ident retransmits
1233 - remember: this is convenient - and maybe / of course faster - but we'll loose "per-node-atomic" operations
1234 - feature: mechanism to implicit inject checksum property to nodes (alter table / modify schema)
1235 - expand statistics / keep track of:
1236 - touched/untouched nodes
1237 - full sync
1238 - just do a push and a pull for now but use stats for touched nodes in between to speed up things
1239 - introduce some new metadata flags for a synchronization partner which is (e.g.) of "source" or "target":
1240 - isNewNodePropagator
1241 - isWriteProtected
1242
1243
1244 =cut
1245
1246 1;

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed