/[cvs]/nfo/perl/libs/Data/Transfer/Sync.pm
ViewVC logotype

Annotation of /nfo/perl/libs/Data/Transfer/Sync.pm

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.1 - (hide annotations)
Fri Nov 29 04:45:50 2002 UTC (21 years, 7 months ago) by joko
Branch: MAIN
+ initial check in

1 joko 1.1 ## $Id$
2     ##
3     ## Copyright (c) 2002 Andreas Motl <andreas.motl@ilo.de>
4     ##
5     ## See COPYRIGHT section in pod text below for usage and distribution rights.
6     ##
7     ## ----------------------------------------------------------------------------------------
8     ## $Log$
9     ## Revision 1.1 2002/10/10 03:44:21 cvsjoko
10     ## + new
11     ## ----------------------------------------------------------------------------------------
12    
13    
14     package Data::Transfer::Sync;
15    
16     use strict;
17     use warnings;
18    
19     use Data::Dumper;
20     use misc::HashExt;
21     use libp qw( md5_base64 );
22     use libdb qw( quotesql hash2Sql );
23     use Data::Transform::OO qw( hash2object );
24     use Data::Compare::Struct qw( getDifference isEmpty );
25    
26     # get logger instance
27     my $logger = Log::Dispatch::Config->instance;
28    
29    
30     sub new {
31     my $invocant = shift;
32     my $class = ref($invocant) || $invocant;
33     my $self = { @_ };
34     $logger->debug( __PACKAGE__ . "->new(@_)" );
35     bless $self, $class;
36     $self->_init();
37     return $self;
38     }
39    
40    
41     sub _init {
42     my $self = shift;
43    
44     # build new container if necessary
45     $self->{container} = Data::Storage::Container->new() if !$self->{container};
46    
47     # add storages to container (optional)
48     foreach (keys %{$self->{storages}}) {
49     $self->{container}->addStorage($_, $self->{storages}->{$_});
50     }
51    
52     # tag storages with id-authority and checksum-provider information
53     # TODO: better store tag inside metadata to hold bits together!
54     map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}};
55     map { $self->{container}->{storage}->{$_}->{isChecksumAuthority} = 1; } @{$self->{checksum_authorities}};
56     map { $self->{container}->{storage}->{$_}->{isWriteProtected} = 1; } @{$self->{write_protected}};
57    
58     }
59    
60    
61     # TODO: some feature to show off the progress of synchronization (cur/max * 100)
62     sub syncNodes {
63    
64     my $self = shift;
65     my $args = shift;
66    
67     # remember arguments through the whole processing
68     $self->{args} = $args;
69    
70     $logger->debug( __PACKAGE__ . "->syncNodes: starting" );
71    
72     # hash to hold and/or fill in metadata required for the processing
73     $self->{meta} = {};
74    
75     # hash to sum up results
76     my $direction_arrow = '';
77    
78     # detect synchronization method to determine which optical symbol (directed arrow) to use
79     if (lc $self->{args}->{direction} eq 'push') {
80     $direction_arrow = '->';
81     } elsif (lc $self->{args}->{direction} eq 'pull') {
82     $direction_arrow = '<-';
83     } elsif (lc $self->{args}->{direction} eq 'full') {
84     $direction_arrow = '<->';
85     } else {
86     }
87    
88     # decompose identifiers for each partner
89     # TODO: take this list from already established/given metadata
90     foreach ('source', 'target') {
91    
92     # get/set metadata for further processing
93    
94     # Partner and Node (e.g.: "L:Country" or "R:countries.csv")
95     if (my $item = $self->{args}->{$_}) {
96     my @item = split(':', $item);
97     $self->{meta}->{$_}->{dbkey} = $item[0];
98     $self->{meta}->{$_}->{node} = $item[1];
99     }
100    
101     # Filter
102     if (my $item_filter = $self->{args}->{$_ . '_filter'}) {
103     $self->{meta}->{$_}->{filter} = $item_filter;
104     }
105    
106     # IdentProvider
107     if (my $item_ident = $self->{args}->{$_ . '_ident'}) {
108     my @item_ident = split(':', $item_ident);
109     $self->{meta}->{$_}->{IdentProvider} = { method => $item_ident[0], arg => $item_ident[1] };
110     }
111    
112     # TODO: ChecksumProvider
113    
114     # exclude properties/subnodes
115     if (my $item_exclude = $self->{args}->{$_ . '_exclude'}) {
116     $self->{meta}->{$_}->{subnodes_exclude} = $item_exclude;
117     }
118    
119     # TypeProvider
120     if (my $item_type = $self->{args}->{$_ . '_type'}) {
121     my @item_type = split(':', $item_type);
122     $self->{meta}->{$_}->{TypeProvider} = { method => $item_type[0], arg => $item_type[1] };
123     }
124    
125     # Callbacks - writers (will be triggered _before_ writing to target)
126     if (my $item_writers = $self->{args}->{$_ . '_callbacks_write'}) {
127     my $descent = $_; # this is important since the following code inside the map wants to use its own context variables
128     map { $self->{meta}->{$descent}->{Callback}->{write}->{$_}++; } @$item_writers;
129     }
130    
131     # Callbacks - readers (will be triggered _after_ reading from source)
132     if (my $item_readers = $self->{args}->{$_ . '_callbacks_read'}) {
133     my $descent = $_;
134     map { $self->{meta}->{$descent}->{Callback}->{read}->{$_}++; } @$item_readers;
135     }
136    
137     # resolve storage objects
138     #$self->{$_} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
139     # relink references to metainfo
140     $self->{meta}->{$_}->{storage} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}};
141     #print "iiiiisprov: ", Dumper($self->{meta}->{$_}->{storage}), "\n";
142     }
143    
144     $logger->info( __PACKAGE__ . "->syncNodes: source=$self->{meta}->{source}->{dbkey}/$self->{meta}->{source}->{node} $direction_arrow target=$self->{meta}->{target}->{dbkey}/$self->{meta}->{target}->{node}" );
145    
146     # build mapping
147     foreach (@{$self->{args}->{mapping}}) {
148     my @key1 = split(':', $_->[0]);
149     my @key2 = split(':', $_->[1]);
150     push @{$self->{meta}->{$key1[0]}->{childnodes}}, $key1[1];
151     push @{$self->{meta}->{$key2[0]}->{childnodes}}, $key2[1];
152     }
153    
154     # check partners/nodes: does partner exist / is node available?
155     foreach my $partner (keys %{$self->{meta}}) {
156     next if $self->{meta}->{$partner}->{storage}->{locator}->{type} eq 'DBI'; # for DBD::CSV - re-enable for others
157     my $node = $self->{meta}->{$partner}->{node};
158     if (!$self->{meta}->{$partner}->{storage}->existsChildNode($node)) {
159     $logger->critical( __PACKAGE__ . "->syncNodes: Could not reach \"$node\" at \"$partner\"." );
160     return;
161     }
162     }
163    
164     # TODO:
165     # + if action == PUSH: start processing
166     # -+ if action == PULL: swap metadata and start processing
167     # - if action == FULL: start processing, then swap metadata and (re-)start processing
168    
169     #print Dumper($self->{args});
170    
171     # manipulate metainfo according to direction of synchronization
172     if (lc $self->{args}->{direction} eq 'push') {
173     # just do it ...
174     } elsif (lc $self->{args}->{direction} eq 'pull') {
175     #print "=======SWAP", "\n";
176     # swap
177     ($self->{meta}->{source}, $self->{meta}->{target}) =
178     ($self->{meta}->{target}, $self->{meta}->{source});
179     } elsif (lc $self->{args}->{direction} eq 'full') {
180     } else {
181     }
182    
183     # import flag means: prepare the source node to be syncable
184     # this is useful if there are e.g. no "ident" or "checksum" columns yet inside a DBI like (row-based) storage
185     if ($self->{args}->{import}) {
186     $self->_prepareNode_MetaProperties('source');
187     $self->_prepareNode_DummyIdent('source');
188     #return;
189     #$self->_erase_all($opts->{source_node});
190     }
191    
192     $self->_syncNodes();
193    
194     }
195    
196    
197     # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="§/%???
198     sub _syncNodes {
199    
200     my $self = shift;
201    
202     my $tc = OneLineDumpHash->new( {} );
203     my $results;
204    
205     # set of objects is already in $self->{args}
206     # TODO: make independent of the terminology "object..."
207     $results = $self->{args}->{objectSet} if $self->{args}->{objectSet};
208    
209     # apply filter
210     if (my $filter = $self->{meta}->{source}->{filter}) {
211     #print Dumper($filter);
212     #exit;
213     $results ||= $self->_getNodeList('source', $filter);
214     }
215    
216     # get reference to node list from convenient method provided by corehandle
217     #$results ||= $self->{source}->getListUnfiltered($self->{meta}->{source}->{node});
218     #$results ||= $self->{meta}->{source}->{storage}->getListUnfiltered($self->{meta}->{source}->{node});
219     $results ||= $self->_getNodeList('source');
220    
221     # checkpoint: do we actually have a list to iterate through?
222     if (!$results || !@{$results}) {
223     $logger->notice( __PACKAGE__ . "->syncNodes: No nodes to synchronize." );
224     return;
225     }
226    
227     # dereference
228     my @results = @{$results};
229    
230     # iterate through set
231     foreach my $source_node_real (@results) {
232    
233     $tc->{total}++;
234    
235     #print "======================== iter", "\n";
236    
237     # clone object (in case we have to modify it here)
238     # TODO:
239     # - is a "deep_copy" needed here if occouring modifications take place?
240     # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms?
241     # - after all, just take care for now that this object doesn't get updated!
242     my $source_node = $source_node_real;
243    
244     # modify entry - handle new style callbacks (the readers)
245     #print Dumper($source_node);
246     #exit;
247    
248     my $descent = 'source';
249    
250     # handle callbacks right now while scanning them (asymmetric to the writers)
251     my $map_callbacks = {};
252     if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
253    
254     my $error = 0;
255    
256     foreach my $node (keys %{$callbacks->{read}}) {
257    
258     my $object = $source_node;
259     my $value; # = $source_node->{$node};
260    
261     # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
262     my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read';
263     my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );';
264     #print $evalstring, "\n"; exit;
265     my $cb_result = eval($evalstring);
266     if ($@) {
267     die $@;
268     $error = 1;
269     print $@, "\n";
270     }
271     # ------------ half-redundant: make $self->callCallback($object, $value, $opts)
272    
273     $source_node->{$node} = $cb_result;
274    
275     }
276    
277     }
278    
279     #print Dumper($source_node);
280    
281     # exclude defined fields (simply delete from object)
282     map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}};
283    
284     # here we accumulate information about the status of the current node (payload/row/object/item/entry)
285     $self->{node} = {};
286     $self->{node}->{source}->{payload} = $source_node;
287    
288     #print "res - ident", "\n";
289    
290     # determine ident of entry
291     my $identOK = $self->_resolveNodeIdent('source');
292     #if (!$identOK && lc $self->{args}->{direction} ne 'import') {
293     if (!$identOK) {
294     $logger->critical( __PACKAGE__ . "->syncNodes: Can not synchronize: No ident found in source node, maybe try to \"import\" this node first." );
295     return;
296     }
297    
298     #print "statload", "\n";
299     #print "ident: ", $self->{node}->{source}->{ident}, "\n";
300    
301     my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident});
302    
303     # mark node as new either if there's no ident or if stat/load failed
304     if (!$statOK) {
305     $self->{node}->{status}->{new} = 1;
306     print "n" if $self->{verbose};
307     }
308    
309     #print "checksum", "\n";
310    
311     # determine status of entry by synchronization method
312     if ( (lc $self->{args}->{method} eq 'checksum') ) {
313     #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) {
314     #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) {
315    
316     # TODO:
317     # is this really worth a "critical"???
318     # no - it should just be a debug appendix i believe
319    
320     #print "readcs", "\n";
321    
322     # calculate checksum of source node
323     #$self->_calcChecksum('source');
324     if (!$self->_readChecksum('source')) {
325     $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
326     $tc->{skip}++;
327     print "s" if $self->{verbose};
328     next;
329     }
330    
331     # get checksum from synchronization target
332     $self->_readChecksum('target');
333     #if (!$self->_readChecksum('target')) {
334     # $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" );
335     # next;
336     #}
337    
338     # pre flight check: do we actually have a checksum provided?
339     #if (!$self->{node}->{source}->{checksum}) {
340     # print "Source checksum for entry with ident \"$self->{node}->{source}->{ident}\" could not be calculated, maybe it's missing?.", "\n";
341     # return;
342     #}
343    
344     # determine if entry is "new" or "dirty"
345     # after all, this seems to be the point where the hammer falls.....
346     print "c" if $self->{verbose};
347     $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum};
348     if (!$self->{node}->{status}->{new}) {
349     $self->{node}->{status}->{dirty} =
350     $self->{node}->{status}->{new} ||
351     (!$self->{node}->{source}->{checksum} || !$self->{node}->{target}->{checksum}) ||
352     ($self->{node}->{source}->{checksum} ne $self->{node}->{target}->{checksum}) ||
353     $self->{args}->{force};
354     }
355    
356     }
357    
358     # first reaction on entry-status: continue with next entry if the current is already "in sync"
359     if (!$self->{node}->{status}->{new} && !$self->{node}->{status}->{dirty}) {
360     $tc->{in_sync}++;
361     next;
362     }
363    
364     # build map to actually transfer the data from source to target
365     $self->_buildMap();
366    
367    
368     #print Dumper($self->{node}); exit;
369    
370     #print "attempt", "\n";
371    
372     # additional (new) checks for feature "write-protection"
373     if ($self->{meta}->{target}->{storage}->{isWriteProtected}) {
374     $tc->{attempt_transfer}++;
375     print "\n" if $self->{verbose};
376     $logger->notice( __PACKAGE__ . "->syncNodes: Target is write-protected. Will not insert or modify node. " .
377     "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" );
378     print "\n" if $self->{verbose};
379     $tc->{skip}++;
380     next;
381     }
382    
383     # transfer contents of map to target
384     if ($self->{node}->{status}->{new}) {
385     $tc->{attempt_new}++;
386     $self->_doTransferToTarget('insert');
387     # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?)
388     $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1);
389     $self->_readChecksum('target');
390    
391     } elsif ($self->{node}->{status}->{dirty}) {
392     $tc->{attempt_modify}++;
393     # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?)
394     $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}};
395     $self->_doTransferToTarget('update');
396     $self->_readChecksum('target');
397     }
398    
399     if ($self->{node}->{status}->{ok}) {
400     $tc->{ok}++;
401     print "t" if $self->{verbose};
402     }
403    
404     if ($self->{node}->{status}->{error}) {
405     $tc->{error}++;
406     push( @{$tc->{error_per_row}}, $self->{node}->{status}->{error} );
407     print "e" if $self->{verbose};
408     }
409    
410     # change ident in source (take from target), if transfer was ok and target is an IdentAuthority
411     # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing
412     if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{storage}->{isIdentAuthority}) {
413     #print Dumper($self->{meta});
414     #print Dumper($self->{node});
415     #exit;
416     $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident});
417     print "r" if $self->{verbose};
418     }
419    
420     print ":" if $self->{verbose};
421    
422     }
423    
424     print "\n" if $self->{verbose};
425    
426     # build user-message from some stats
427     my $msg = "stats: $tc";
428    
429     if ($tc->{error_per_row}) {
430     $msg .= "\n";
431     $msg .= "errors:" . "\n";
432     $msg .= Dumper($tc->{error_per_row});
433     }
434    
435     # todo!!!
436     #sysevent( { usermsg => $msg, level => $level }, $taskEvent );
437     $logger->info( __PACKAGE__ . "->syncNodes: $msg" );
438    
439     return $tc;
440    
441     }
442    
443    
444     sub _dumpCompact {
445     my $self = shift;
446    
447     #my $vars = \@_;
448     my @data = ();
449    
450     my $count = 0;
451     foreach (@_) {
452     my $item = {};
453     foreach my $key (keys %$_) {
454     my $val = $_->{$key};
455     if (ref $val eq 'Set::Object') {
456     #print "========================= SET", "\n";
457     #print Dumper($val);
458     #print Dumper($val->members());
459     #$val = $val->members();
460     #$vars->[$count]->{$key} = $val->members() if $val->can("members");
461     #$item->{$key} = $val->members() if $val->can("members");
462     $item->{$key} = $val->members();
463     #print Dumper($vars->[$count]->{$key});
464     } else {
465     $item->{$key} = $val;
466     }
467     }
468     push @data, $item;
469     $count++;
470     }
471    
472     #print "Dump:", "\n";
473     #print Dumper(@data);
474    
475     $Data::Dumper::Indent = 0;
476     my $result = Dumper(@data);
477     $Data::Dumper::Indent = 2;
478     return $result;
479     }
480    
481    
482     sub _calcChecksum {
483    
484     my $self = shift;
485     my $descent = shift;
486     my $specifier = shift;
487    
488     # calculate checksum for current object
489     my $ident = $self->{node}->{$descent}->{ident};
490    
491     # build dump of this node
492     my $payload = $self->{node}->{$descent}->{payload};
493     #my $dump = $ident . "\n" . $item->quickdump();
494     #my $dump = $ident . "\n" . Dumper($item);
495     my $dump = $ident . "\n" . $self->_dumpCompact($payload);
496    
497     # TODO: $logger->dump( ... );
498     #$logger->debug( __PACKAGE__ . ": " . $dump );
499     #$logger->dump( __PACKAGE__ . ": " . $dump );
500    
501     # calculate checksum from dump
502     # md5-based fingerprint, base64 encoded (from Digest::MD5)
503     #my $checksum_cur = md5_base64($objdump) . '==';
504     # 32-bit integer "hash" value (maybe faster?) (from DBI)
505     $self->{node}->{$descent}->{checksum} = DBI::hash($dump, 1);
506    
507     # signal good
508     return 1;
509    
510     }
511    
512    
513     sub _readChecksum {
514     my $self = shift;
515    
516     my $descent = shift;
517    
518     #print "getcheck:", "\n"; print Dumper($self->{node}->{$descent});
519    
520     if (!$self->{node}->{$descent}) {
521     # signal checksum bad
522     return;
523     }
524    
525     # get checksum for current entry
526     # TODO: don't have the checksum column/property hardcoded as "cs" here, make this configurable somehow
527    
528     if ($self->{meta}->{$descent}->{storage}->{isChecksumAuthority}) {
529     #$self->{node}->{$descent}->{checksum} = $entry->{cs};
530     #$self->{node}->{$descent}->{checksum} = $self->_calcChecksum($descent); # $entry->{cs};
531     #print "descent: $descent", "\n";
532     $self->_calcChecksum($descent);
533     #print "checksum: ", $self->{node}->{$descent}->{checksum}, "\n";
534     } else {
535    
536     #$self->{node}->{$descent}->{checksum} = $entry->{cs};
537     $self->{node}->{$descent}->{checksum} = $self->{node}->{$descent}->{payload}->{cs};
538     }
539    
540     # signal checksum good
541     return 1;
542    
543     }
544    
545    
546     sub _buildMap {
547    
548     my $self = shift;
549    
550     # field-structure for building sql
551     # mapping of sql-fieldnames to object-attributes
552     $self->{node}->{map} = {};
553    
554     # manually set ...
555     # ... object-id
556     $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}} = $self->{node}->{source}->{ident};
557     # ... checksum
558     $self->{node}->{map}->{cs} = $self->{node}->{source}->{checksum};
559    
560     #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
561    
562     # for transferring flat structures via simple (1:1) mapping
563     # TODO: diff per property / property value
564    
565     if ($self->{args}->{mapping}) {
566     # apply mapping from $self->{args}->{mapping} to $self->{node}->{map}
567     #foreach my $key (@{$self->{meta}->{source}->{childnodes}}) {
568     my @childnodes = @{$self->{meta}->{source}->{childnodes}};
569     for (my $mapidx = 0; $mapidx <= $#childnodes; $mapidx++) {
570     #my $map_right = $self->{args}->{mapping}->{$key};
571    
572     # get property name
573     $self->{node}->{source}->{propcache}->{property} = $self->{meta}->{source}->{childnodes}->[$mapidx];
574     $self->{node}->{target}->{propcache}->{property} = $self->{meta}->{target}->{childnodes}->[$mapidx];
575     #print "map: $map_right", "\n";
576    
577     # get property value
578     my $value;
579    
580     # detect for callback - old style - (maybe the better???)
581     if (ref($self->{node}->{target}->{map}) eq 'CODE') {
582     #$value = &$map_right($objClone);
583     } else {
584     # plain (scalar?) value
585     #$value = $objClone->{$map_right};
586     $self->{node}->{source}->{propcache}->{value} = $self->{node}->{source}->{payload}->{$self->{node}->{source}->{propcache}->{property}};
587     }
588     #$self->{node}->{map}->{$key} = $value;
589    
590     # encode values dependent on type of underlying storage here - expand cases...
591     my $storage_type = $self->{meta}->{target}->{storage}->{locator}->{type};
592     if ($storage_type eq 'DBI') {
593     # ...for sql
594     $self->{node}->{source}->{propcache}->{value} = quotesql($self->{node}->{source}->{propcache}->{value});
595     } elsif ($storage_type eq 'Tangram') {
596     } elsif ($storage_type eq 'LDAP') {
597     # TODO: encode utf8 here?
598     }
599    
600     # store value to transfer map
601     $self->{node}->{map}->{$self->{node}->{target}->{propcache}->{property}} = $self->{node}->{source}->{propcache}->{value};
602    
603     }
604     }
605    
606     #print "self->{entry}: ", Dumper($self->{node}), "\n"; exit;
607    
608     # for transferring deeply nested structures described by expressions
609     # this currently does not work!
610     # TODO: re-enable this!
611     if ($self->{args}->{mappingV2}) {
612    
613     # apply mapping from $self->{args}->{mappingV2} to $self->{node}->{map}
614     foreach my $mapStep (@{$self->{args}->{mappingV2}}) {
615    
616     # prepare left/right keys/values
617     my $left_key = $mapStep->{left};
618     my $left_val = _resolveMapStepExpr( $self->{node}->{source}->{payload}, $mapStep->{left} );
619     my $right_key = $mapStep->{right};
620     my $right_val = ( $mapStep->{right} );
621     #print "map: $map_right", "\n";
622    
623     if ($mapStep->{method}) {
624     if ($mapStep->{method} eq 'v:1') {
625     $left_val = $left_key;
626     }
627     }
628    
629     #$self->{node}->{map}->{$key} = $value;
630     #if ( grep(!/$right_key/, @{$self->{args}->{exclude}}) ) {
631     $self->{node}->{map}->{$right_key} = $self->{R}->quoteSql($left_val);
632     #}
633     }
634     }
635    
636     # TODO: $logger->dump( ... );
637     #$logger->debug( "sqlmap:" . "\n" . Dumper($self->{node}->{map}) );
638     #print "sqlmap: ", Dumper($self->{node}->{map}), "\n";
639     #print "entrystatus: ", Dumper($self->{node}), "\n";
640    
641     }
642    
643     sub _resolveNodeIdent {
644     my $self = shift;
645     my $descent = shift;
646    
647     #print Dumper($self->{node}->{$descent});
648    
649     # get to the payload
650     #my $item = $specifier->{item};
651     my $payload = $self->{node}->{$descent}->{payload};
652    
653     # resolve method to get to the id of the given item
654     # we use global metadata and the given descent for this task
655     #my $ident = $self->{$descent}->id($item);
656     #my $ident = $self->{meta}->{$descent}->{storage}->id($item);
657    
658     my $ident;
659     my $provider_method = $self->{meta}->{$descent}->{IdentProvider}->{method};
660     my $provider_arg = $self->{meta}->{$descent}->{IdentProvider}->{arg};
661    
662     # resolve to ident
663     if ($provider_method eq 'property') {
664     $ident = $payload->{$provider_arg};
665    
666     } elsif ($provider_method eq 'storage_method') {
667     #$ident = $self->{meta}->{$descent}->{storage}->id($item);
668     $ident = $self->{meta}->{$descent}->{storage}->$provider_arg($payload);
669     }
670    
671     $self->{node}->{$descent}->{ident} = $ident;
672    
673     return 1 if $ident;
674    
675     }
676    
677    
678     sub _modifyNode {
679     my $self = shift;
680     my $descent = shift;
681     my $action = shift;
682     my $map = shift;
683     my $crit = shift;
684    
685     # map for new style callbacks
686     my $map_callbacks = {};
687    
688     # checks go first!
689    
690     # TODO: this should be reviewed first - before extending ;-)
691     # TODO: this should be extended:
692     # count this cases inside the caller to this sub and provide a better overall message
693     # if this counts still zero in the end:
694     # "No nodes have been touched for modify: Do you have column-headers in your csv file?"
695     if (not defined $self->{node}) {
696     #$logger->critical( __PACKAGE__ . "->_modifyNode failed: \"$descent\" node is empty." );
697     #return;
698     }
699    
700     # transfer callback nodes from value map to callback map - handle them afterwards! - (new style callbacks)
701     if (my $callbacks = $self->{meta}->{$descent}->{Callback}) {
702     foreach my $callback (keys %{$callbacks->{write}}) {
703     $map_callbacks->{write}->{$callback} = $map->{$callback};
704     delete $map->{$callback};
705     }
706     }
707    
708    
709     # DBI speaks SQL
710     if ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'DBI') {
711    
712     #print Dumper($self->{node});
713     my $sql_main;
714     # translate map to sql
715     #print $action, "\n"; exit;
716     #print $self->{meta}->{$descent}->{node}, "\n"; exit;
717     #print "action:";
718     #print $action, "\n";
719     #$action = "anc";
720     #print "yai", "\n";
721     if (lc($action) eq 'insert') {
722     $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_INSERT');
723     } elsif (lc $action eq 'update') {
724     $crit ||= "$self->{meta}->{$descent}->{IdentProvider}->{arg}='$self->{node}->{$descent}->{ident}'";
725     $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_UPDATE', $crit);
726     }
727    
728     #print "sql: ", $sql_main, "\n";
729     #exit;
730    
731     # transfer data
732     my $sqlHandle = $self->{meta}->{$descent}->{storage}->sendCommand($sql_main);
733    
734     # handle errors
735     if ($sqlHandle->err) {
736     #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
737     $self->{node}->{status}->{error} = {
738     statement => $sql_main,
739     state => $sqlHandle->state,
740     err => $sqlHandle->err,
741     errstr => $sqlHandle->errstr,
742     };
743     } else {
744     $self->{node}->{status}->{ok} = 1;
745     }
746    
747     # Tangram does it the oo-way (naturally)
748     } elsif ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'Tangram') {
749     my $sql_main;
750     my $object;
751    
752     # determine classname
753     my $classname = $self->{meta}->{$descent}->{node};
754    
755     # properties to exclude
756     my @exclude = @{$self->{meta}->{$descent}->{subnodes_exclude}};
757    
758    
759     if (my $identProvider = $self->{meta}->{$descent}->{IdentProvider}) {
760     push @exclude, $identProvider->{arg};
761     }
762    
763     # new feature:
764     # - check TypeProvider metadata property from other side
765     # - use argument (arg) inside as a classname for object creation on this side
766     #my $otherSide = $self->_otherSide($descent);
767     if (my $typeProvider = $self->{meta}->{$descent}->{TypeProvider}) {
768     #print Dumper($map);
769     $classname = $map->{$typeProvider->{arg}};
770     # remove nodes from map also (push nodes to "subnodes_exclude" list)
771     push @exclude, $typeProvider->{arg};
772     }
773    
774     # exclude banned properties (remove from map)
775     #map { delete $self->{node}->{map}->{$_} } @{$self->{args}->{exclude}};
776     map { delete $map->{$_} } @exclude;
777    
778     # list of properties
779     my @props = keys %{$map};
780    
781     # transfer data
782     if (lc $action eq 'insert') {
783    
784     # build array to initialize object
785     #my @initarray = ();
786     #map { push @initarray, $_, undef; } @props;
787    
788     # make the object persistent in four steps:
789     # - raw create (perl / class tangram scope)
790     # - engine insert (tangram scope) ... this establishes inheritance - don't try to fill in inherited properties before!
791     # - raw fill-in from hash (perl scope)
792     # - engine update (tangram scope) ... this updates all properties just filled in
793    
794     # create new object ...
795     #my $object = $classname->new( @initarray );
796     $object = $classname->new();
797    
798     # ... pass to orm ...
799     $self->{meta}->{$descent}->{storage}->insert($object);
800    
801     # ... and initialize with empty (undef'd) properties.
802     #print Dumper(@props);
803     map { $object->{$_} = undef; } @props;
804    
805     # mix in values ...
806     hash2object($object, $map);
807    
808     # ... and re-update@orm.
809     $self->{meta}->{$descent}->{storage}->update($object);
810    
811     # asymmetry: get ident after insert
812     # TODO:
813     # - just do this if it is an IdentAuthority
814     # - use IdentProvider metadata here
815     $self->{node}->{$descent}->{ident} = $self->{meta}->{$descent}->{storage}->id($object);
816    
817    
818     } elsif (lc $action eq 'update') {
819    
820     # get fresh object from orm first
821     $object = $self->{meta}->{$descent}->{storage}->load($self->{node}->{$descent}->{ident});
822    
823     #print Dumper($self->{node});
824    
825     # mix in values
826     #print Dumper($object);
827     hash2object($object, $map);
828     #print Dumper($object);
829     #exit;
830     $self->{meta}->{$descent}->{storage}->update($object);
831     }
832    
833     my $error = 0;
834    
835     # handle new style callbacks - this is a HACK - do this without an eval!
836     #print Dumper($map);
837     #print "cb: ", Dumper($self->{meta}->{$descent}->{Callback});
838     #print Dumper($map_callbacks);
839     foreach my $node (keys %{$map_callbacks->{write}}) {
840     #print Dumper($node);
841     my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_write';
842     my $evalstring = $perl_callback . '( { object => $object, value => $map_callbacks->{write}->{$node}, storage => $self->{meta}->{$descent}->{storage} } );';
843     #print $evalstring, "\n"; exit;
844     eval($evalstring);
845     if ($@) {
846     $error = 1;
847     print $@, "\n";
848     }
849    
850     #print "after eval", "\n";
851    
852     if (!$error) {
853     # re-update@orm
854     $self->{meta}->{$descent}->{storage}->update($object);
855     }
856     }
857    
858     # handle errors
859     if ($error) {
860     #print "error", "\n";
861     =pod
862     my $sqlHandle;
863     #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; }
864     $self->{node}->{status}->{error} = {
865     statement => $sql_main,
866     state => $sqlHandle->state,
867     err => $sqlHandle->err,
868     errstr => $sqlHandle->errstr,
869     };
870     =cut
871     # rollback....
872     #print "rollback", "\n";
873     $self->{meta}->{$descent}->{storage}->erase($object);
874     #print "after rollback", "\n";
875     } else {
876     $self->{node}->{status}->{ok} = 1;
877     }
878    
879    
880     }
881    
882     }
883    
884     # TODO:
885     # this should be split up into...
886     # - a "_statNode" (should just touch the node to check for existance)
887     # - a "_loadNode" (should load node completely)
888     # - maybe additionally a "loadNodeProperty" (may specify properties to load)
889     # - introduce $self->{nodecache} for this purpose
890     # TODO:
891     # should we:
892     # - not pass ident in here but resolve it via "$descent"?
893     # - refactor this and stuff it with additional debug/error message
894     # - this = the way the implicit load mechanism works
895     sub _statloadNode {
896    
897     my $self = shift;
898     my $descent = shift;
899     my $ident = shift;
900     my $force = shift;
901    
902     # fetch entry to retrieve checksum from
903     # was:
904     if (!$self->{node}->{$descent} || $force) {
905     # is:
906     #if (!$self->{node}->{$descent}->{item} || $force) {
907    
908     if (!$ident) {
909     #print "\n", "Attempt to fetch entry implicitely by ident failed: no ident given! This may result in an insert if no write-protection is in the way.", "\n";
910     return;
911     }
912    
913     my $result = $self->{meta}->{$descent}->{storage}->sendQuery({
914     node => $self->{meta}->{$descent}->{node},
915     subnodes => [qw( cs )],
916     criterias => [
917     { key => $self->{meta}->{$descent}->{IdentProvider}->{arg},
918     op => 'eq',
919     val => $ident },
920     ]
921     });
922    
923     my $entry = $result->getNextEntry();
924     my $status = $result->getStatus();
925    
926     # TODO: enhance error handling (store inside tc)
927     #if (!$row) {
928     # print "\n", "row error", "\n";
929     # next;
930     #}
931     if (($status && $status->{err}) || !$entry) {
932     #$logger->critical( __PACKAGE__ . "->_loadNode (ident=\"$ident\") failed" );
933     return;
934     }
935     # was:
936     # $self->{node}->{$descent}->{ident} = $ident;
937     # is:
938     # TODO: re-resolve ident from entry via metadata "IdentProvider"
939     $self->{node}->{$descent}->{ident} = $ident;
940     $self->{node}->{$descent}->{payload} = $entry;
941     }
942    
943     return 1;
944    
945     }
946    
947     sub _doTransferToTarget {
948     my $self = shift;
949     my $action = shift;
950     $self->_modifyNode('target', $action, $self->{node}->{map});
951     }
952    
953     sub _doModifySource_IdentChecksum {
954     my $self = shift;
955     my $ident_new = shift;
956     # this changes an old node to a new one including ident and checksum
957     # TODO:
958     # - eventually introduce an external resource to store this data to
959     # - we won't have to "re"-modify the source node here
960     my $map = {
961     $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new,
962     cs => $self->{node}->{target}->{checksum},
963     };
964     #print Dumper($map);
965     #print Dumper($self->{node});
966     #exit;
967     $self->_modifyNode('source', 'update', $map);
968     }
969    
970    
971     # this is a shortcut method
972     # ... let's try to avoid _any_ redundant code in here (ok... - at the cost of method lookups...)
973     sub _getNodeList {
974     my $self = shift;
975     my $descent = shift;
976     my $filter = shift;
977     return $self->{meta}->{$descent}->{storage}->getListFiltered($self->{meta}->{$descent}->{node}, $filter);
978     }
979    
980    
981     sub _prepareNode_MetaProperties {
982     my $self = shift;
983     my $descent = shift;
984    
985     $logger->info( __PACKAGE__ . "->_prepareNode_MetaProperties( descent $descent )" );
986    
987     # TODO: this should (better) be: "my $firstnode = $self->_getFirstNode($descent);"
988     my $list = $self->_getNodeList($descent);
989    
990     # get first node
991     my $firstnode = $list->[0];
992    
993     # check if node contains meta properties/nodes
994     # TODO: "cs" is hardcoded here!
995     my @required = ( $self->{meta}->{$descent}->{IdentProvider}->{arg}, 'cs' );
996     my @found = keys %$firstnode;
997     #my @diff = getDifference(\@found, \@required);
998     my $diff = getDifference(\@required, \@found);
999     #print Dumper(@found);
1000     #print Dumper(@required);
1001     #print Dumper(@diff);
1002     #if (!$#diff || $#diff == -1) {
1003     if (isEmpty($diff)) {
1004     $logger->warning( __PACKAGE__ . "->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter..." );
1005     foreach (@required) {
1006     my $sql = "ALTER TABLE $self->{meta}->{$descent}->{node} ADD COLUMN $_";
1007     #print "sql: $sql", "\n";
1008     my $res = $self->{meta}->{$descent}->{storage}->sendCommand($sql);
1009     #print Dumper($res->getStatus());
1010     }
1011     }
1012    
1013     }
1014    
1015     sub _prepareNode_DummyIdent {
1016     my $self = shift;
1017     my $descent = shift;
1018    
1019     $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" );
1020    
1021     my $list = $self->_getNodeList($descent);
1022     #print Dumper($list);
1023     my $i = 0;
1024     my $ident_base = 5678983;
1025     my $ident_appendix = '0001';
1026     foreach my $node (@$list) {
1027     my $ident_dummy = $i + $ident_base;
1028     $ident_dummy .= $ident_appendix;
1029     my $map = {
1030     $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy,
1031     cs => undef,
1032     };
1033    
1034     # diff lists and ...
1035     my $diff = getDifference([keys %$node], [keys %$map]);
1036     next if $#{$diff} == -1;
1037    
1038     # ... build criteria including all columns
1039     my @crits;
1040     foreach my $property (@$diff) {
1041     next if !$property;
1042     my $value = $node->{$property};
1043     next if !$value;
1044     push @crits, "$property='" . quotesql($value) . "'";
1045     }
1046     my $crit = join ' AND ', @crits;
1047     print "p" if $self->{verbose};
1048     $self->_modifyNode($descent, 'update', $map, $crit);
1049     $i++;
1050     }
1051    
1052     print "\n" if $self->{verbose};
1053    
1054     if (!$i) {
1055     $logger->warning( __PACKAGE__ . "->_prepareNode_DummyIdent: no nodes touched" );
1056     }
1057    
1058     }
1059    
1060     # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core)
1061     sub _otherSide {
1062     my $self = shift;
1063     my $side = shift;
1064     return 'source' if $side eq 'target';
1065     return 'target' if $side eq 'source';
1066     return '';
1067     }
1068    
1069    
1070     =pod
1071    
1072    
1073     =head1 DESCRIPTION
1074    
1075     Data::Transfer::Sync is a module providing a generic synchronization process
1076     across arbitrary/multiple storages based on a ident/checksum mechanism.
1077     It sits on top of Data::Storage.
1078    
1079    
1080     =head1 REQUIREMENTS
1081    
1082     For full functionality:
1083     Data::Storage
1084     Data::Transform
1085     Data::Compare
1086     ... and all their dependencies
1087    
1088    
1089     =head1 AUTHORS / COPYRIGHT
1090    
1091     The Data::Storage module is Copyright (c) 2002 Andreas Motl.
1092     All rights reserved.
1093    
1094     You may distribute it under the terms of either the GNU General Public
1095     License or the Artistic License, as specified in the Perl README file.
1096    
1097    
1098     =head1 SUPPORT / WARRANTY
1099    
1100     Data::Storage is free software. IT COMES WITHOUT WARRANTY OF ANY KIND.
1101    
1102    
1103    
1104     =head1 BUGS
1105    
1106     When in "import" mode for windows file - DBD::AutoCSV may hang.
1107     Hint: Maybe the source node contains an ident-, but no checksum-column?
1108    
1109    
1110     =head1 USER LEVEL ERRORS
1111    
1112     =head4 Mapping
1113    
1114     - - - - - - - - - - - - - - - - - - - - - - - - - -
1115     info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )critical: BizWorks::Process::Setup->startSync: Can't access mapping for node "Currency" - please check BizWorks::ResourceMapping.
1116     - - - - - - - - - - - - - - - - - - - - - - - - - -
1117     You have to create a sub for each node used in synchronization inside named Perl module. The name of this sub _must_ match
1118     the name of the node you want to sync. This sub holds mapping metadata to give the engine hints about how
1119     to access the otherwise generic nodes.
1120     - - - - - - - - - - - - - - - - - - - - - - - - - -
1121    
1122    
1123     =head4 DBD::AutoCSV's rulebase
1124    
1125     - - - - - - - - - - - - - - - - - - - - - - - - - -
1126     info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )
1127     info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1128    
1129     Execution ERROR: Error while scanning: Missing first row or scanrule not applied. at C:/home/amo/develop/netfrag.org/nfo/perl/libs/DBD/CSV.p
1130     m line 165, <GEN9> line 1.
1131     called from C:/home/amo/develop/netfrag.org/nfo/perl/libs/Data/Storage/Handler/DBI.pm at 123.
1132    
1133     DBI-Error: DBD::AutoCSV::st fetchrow_hashref failed: Attempt to fetch row from a Non-SELECT statement
1134     notice: Data::Transfer::Sync->syncNodes: No nodes to synchronize.
1135     - - - - - - - - - - - - - - - - - - - - - - - - - -
1136     DBD::AutoCSV contains a rulebase which is spooled down while attempting to guess the style of the csv file regarding
1137     parameters like newline (eol), column-seperation-character (sep_char), quoting character (quote_char).
1138     If this spool runs out of entries and no style could be resolved, DBD::CSV dies causing this "Execution ERROR" which
1139     results in a "DBI-Error" afterwards.
1140     - - - - - - - - - - - - - - - - - - - - - - - - - -
1141    
1142    
1143     =head4 Check structure of source node
1144    
1145     - - - - - - - - - - - - - - - - - - - - - - - - - -
1146     info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1147     critical: Data::Transfer::Sync->syncNodes: Can not synchronize: No ident found in source node, maybe try to "import" this node first.
1148     - - - - - - - - - - - - - - - - - - - - - - - - - -
1149     If lowlevel detection succeeds, but no other required informations are found, this message is issued.
1150     "Other informations" might be:
1151     - column-header-row completely missing
1152     - ident column is empty
1153     - - - - - - - - - - - - - - - - - - - - - - - - - -
1154    
1155    
1156     =head4 Modify structure of source node
1157    
1158     - - - - - - - - - - - - - - - - - - - - - - - - - -
1159     info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv
1160     info: Data::Transfer::Sync->_prepareNode_MetaProperties( descent source )
1161     warning: Data::Transfer::Sync->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter...
1162     SQL ERROR: Command 'ALTER' not recognized or not supported!
1163    
1164     SQL ERROR: Command 'ALTER' not recognized or not supported!
1165     - - - - - - - - - - - - - - - - - - - - - - - - - -
1166     The Engine found a node which structure does not match the required. It tries to alter this automatically - only when doing "import" -
1167     but the DBD driver (in this case DBD::CSV) gets in the way croaking not to be able to do this.
1168     This could also appear if your database connection has insufficient rights to modify the database structure.
1169     DBD::CSV croaks because it doesn't implement the ALTER command, so please edit your columns manually.
1170     Hint: Add columns with the names of your "ident" and "checksum" property specifications.
1171     - - - - - - - - - - - - - - - - - - - - - - - - - -
1172    
1173    
1174     =head4 Load source node by ident
1175    
1176     - - - - - - - - - - - - - - - - - - - - - - - - - -
1177     info: Data::Transfer::Sync->_prepareNode_DummyIdent( descent source )
1178     pcritical: Data::Transfer::Sync->_modifyNode failed: "source" node is empty.
1179     - - - - - - - - - - - - - - - - - - - - - - - - - -
1180     The source node could not be loaded. Maybe the ident is missing. Please check manually.
1181     Hint: Like above, the ident and/or checksum columns may be missing....
1182     - - - - - - - - - - - - - - - - - - - - - - - - - -
1183    
1184    
1185     =head1 TODO
1186    
1187     - sub _resolveIdentProvider
1188     - wrap _doModifySource and _doTransferTarget around a core function which can change virtually any type of node
1189     - split this module up into Sync.pm, Sync/Core.pm, Sync/Compare.pm and Sync/Compare/Checksum.pm
1190     - introduce _compareNodes as a core method and wrap it around methods in Sync/Compare/Checksum.pm
1191     - introduce Sync/Compare/MyComparisonImplementation.pm
1192     - some generic deferring method - e.g. "$self->defer(action)" - to be able to accumulate a bunch of actions for later processing
1193     - this implies everything done is _really_ split up into generic actions - how else would we defer them???
1194     - example uses:
1195     - fetch whole checksum list from node
1196     - remember source ident retransmits
1197     - remember: this is convenient - and maybe / of course faster - but we'll loose "per-node-atomic" operations
1198     - feature: mechanism to implicit inject checksum property to nodes (alter table / modify schema)
1199     - expand statistics / keep track of:
1200     - touched/untouched nodes
1201     - full sync
1202     - just do a push and a pull for now but use stats for touched nodes in between to speed up things
1203     - introduce some new metadata flags for a synchronization partner which is (e.g.) of "source" or "target":
1204     - isNewNodePropagator
1205     - isWriteProtected
1206    
1207    
1208     =cut
1209    
1210     1;

MailToCvsAdmin">MailToCvsAdmin
ViewVC Help
Powered by ViewVC 1.1.26 RSS 2.0 feed