--- nfo/perl/libs/Data/Transfer/Sync.pm 2002/11/29 04:45:50 1.1 +++ nfo/perl/libs/Data/Transfer/Sync.pm 2002/12/13 21:49:34 1.7 @@ -1,4 +1,4 @@ -## $Id: Sync.pm,v 1.1 2002/11/29 04:45:50 joko Exp $ +## $Id: Sync.pm,v 1.7 2002/12/13 21:49:34 joko Exp $ ## ## Copyright (c) 2002 Andreas Motl ## @@ -6,6 +6,32 @@ ## ## ---------------------------------------------------------------------------------------- ## $Log: Sync.pm,v $ +## Revision 1.7 2002/12/13 21:49:34 joko +## + sub configure +## + sub checkOptions +## +## Revision 1.6 2002/12/06 04:49:10 jonen +## + disabled output-puffer here +## +## Revision 1.5 2002/12/05 08:06:05 joko +## + bugfix with determining empty fields (Null) with DBD::CSV +## + debugging +## + updated comments +## +## Revision 1.4 2002/12/03 15:54:07 joko +## + {import}-flag is now {prepare}-flag +## +## Revision 1.3 2002/12/01 22:26:59 joko +## + minor cosmetics for logging +## +## Revision 1.2 2002/12/01 04:43:25 joko +## + mapping deatil entries may now be either an ARRAY or a HASH +## + erase flag is used now (for export-operations) +## + expressions to refer to values inside deep nested structures +## - removed old mappingV2-code +## + cosmetics +## + sub _erase_all +## ## Revision 1.1 2002/11/29 04:45:50 joko ## + initial check in ## @@ -20,29 +46,48 @@ use warnings; use Data::Dumper; +#use Hash::Merge qw( merge ); + use misc::HashExt; use libp qw( md5_base64 ); use libdb qw( quotesql hash2Sql ); -use Data::Transform::OO qw( hash2object ); +use Data::Transform::Deep qw( hash2object refexpr2perlref ); use Data::Compare::Struct qw( getDifference isEmpty ); # get logger instance my $logger = Log::Dispatch::Config->instance; +$| = 1; sub new { my $invocant = shift; my $class = ref($invocant) || $invocant; - my $self = { @_ }; + my $self = {}; $logger->debug( __PACKAGE__ . "->new(@_)" ); bless $self, $class; - $self->_init(); + $self->configure(@_); return $self; } +sub configure { + my $self = shift; + my @args = @_; + if (!isEmpty(\@args)) { + my %properties = @_; + # merge args to properties + map { $self->{$_} = $properties{$_}; } keys %properties; + $self->_init(); + } else { + #print "no args!", "\n"; + } + #print Dumper($self); +} + sub _init { my $self = shift; + + $self->{configured} = 1; # build new container if necessary $self->{container} = Data::Storage::Container->new() if !$self->{container}; @@ -61,12 +106,109 @@ } +sub prepareOptions { + + my $self = shift; + my $opts = shift; + +#print Dumper($opts); + + $opts->{mode} ||= ''; + $opts->{erase} ||= 0; + #$opts->{import} ||= 0; + + $logger->info( __PACKAGE__ . "->prepareOptions( source_node $opts->{source_node} mode $opts->{mode} erase $opts->{erase} prepare $opts->{prepare} )"); + + if (!$opts->{mapping} || !$opts->{mapping_module}) { + $logger->warning( __PACKAGE__ . "->prepareOptions: No mapping supplied - please check key 'mappings' in BizWorks/Config.pm"); + } + + my $evstring = "use $opts->{mapping_module};"; + eval($evstring); + if ($@) { + $logger->warning( __PACKAGE__ . "->prepareOptions: error while trying to access mapping - $@"); + return; + } + + # resolve mapping metadata (returned from sub) + my $mapObject = $opts->{mapping_module}->new(); + #print Dumper($map); + my $source_node_name = $opts->{source_node}; + # check if mapping for certain node is contained in mapping object + if (!$mapObject->can($source_node_name)) { + $logger->warning( __PACKAGE__ . "->prepareOptions: Can't access mapping for node \"$source_node_name\" - please check $opts->{mapping_module}."); + return; + } + my $map = $mapObject->$source_node_name; + + # remove asymmetries from $map (patch keys) + $map->{source_node} = $map->{source}; delete $map->{source}; + $map->{target_node} = $map->{target}; delete $map->{target}; + $map->{mapping} = $map->{details}; delete $map->{details}; + $map->{direction} = $map->{mode}; delete $map->{mode}; + + # defaults (mostly for backward-compatibility) + $map->{source_node} ||= $source_node_name; + $map->{source_ident} ||= 'storage_method:id'; + $map->{target_ident} ||= 'property:oid'; + $map->{direction} ||= $opts->{mode}; # | PUSH | PULL | FULL + $map->{method} ||= 'checksum'; # | timestamp + $map->{source_exclude} ||= [qw( cs )]; + + # merge map to opts + map { $opts->{$_} = $map->{$_}; } keys %$map; + +#print Dumper($opts); + + # TODO: move this to checkOptions... + + # check - do we have a target? + if (!$opts->{target_node}) { + $logger->warning( __PACKAGE__ . "->prepareOptions: No target given - please check metadata declaration."); + return; + } + + + #return $opts; + return 1; + +} + + +sub checkOptions { + my $self = shift; + my $opts = shift; + + my $result = 1; + + # check - do we have a target node? + if (!$opts->{target_node}) { + $logger->warning( __PACKAGE__ . "->checkOptions: Error while resolving resource metadata - no 'target node' could be determined."); + $result = 0; + } + + # check - do we have a mapping? + if (!$opts->{mapping} && !$opts->{mapping_module}) { + $logger->warning( __PACKAGE__ . "->checkOptions: Error while resolving resource metadata - no 'mapping' could be determined."); + $result = 0; + } + + return $result; + +} + + # TODO: some feature to show off the progress of synchronization (cur/max * 100) sub syncNodes { my $self = shift; my $args = shift; + if (!$self->{configured}) { + $logger->critical( __PACKAGE__ . "->syncNodes: Synchronization object is not configured/initialized correctly." ); + return; + } + # remember arguments through the whole processing $self->{args} = $args; @@ -89,7 +231,7 @@ } # decompose identifiers for each partner - # TODO: take this list from already established/given metadata + # TODO: refactor!!! take this list from already established/given metadata foreach ('source', 'target') { # get/set metadata for further processing @@ -144,16 +286,37 @@ #print "iiiiisprov: ", Dumper($self->{meta}->{$_}->{storage}), "\n"; } +#print Dumper($self->{meta}); + $logger->info( __PACKAGE__ . "->syncNodes: source=$self->{meta}->{source}->{dbkey}/$self->{meta}->{source}->{node} $direction_arrow target=$self->{meta}->{target}->{dbkey}/$self->{meta}->{target}->{node}" ); # build mapping + # incoming: and Array of node map entries (Array or Hash) - e.g. + # [ 'source:item_name' => 'target:class_val' ] + # { source => 'event->startDateTime', target => 'begindate' } foreach (@{$self->{args}->{mapping}}) { - my @key1 = split(':', $_->[0]); - my @key2 = split(':', $_->[1]); - push @{$self->{meta}->{$key1[0]}->{childnodes}}, $key1[1]; - push @{$self->{meta}->{$key2[0]}->{childnodes}}, $key2[1]; + if (ref $_ eq 'ARRAY') { + my @entry1 = split(':', $_->[0]); + my @entry2 = split(':', $_->[1]); + my $descent = []; + my $node = []; + $descent->[0] = $entry1[0]; + $descent->[1] = $entry2[0]; + $node->[0] = $entry1[1]; + $node->[1] = $entry2[1]; + push @{$self->{meta}->{$descent->[0]}->{childnodes}}, $node->[0]; + push @{$self->{meta}->{$descent->[1]}->{childnodes}}, $node->[1]; + } elsif (ref $_ eq 'HASH') { + foreach my $entry_key (keys %$_) { + my $entry_val = $_->{$entry_key}; + push @{$self->{meta}->{$entry_key}->{childnodes}}, $entry_val; + } + } + } +#print Dumper($self->{meta}); + # check partners/nodes: does partner exist / is node available? foreach my $partner (keys %{$self->{meta}}) { next if $self->{meta}->{$partner}->{storage}->{locator}->{type} eq 'DBI'; # for DBD::CSV - re-enable for others @@ -185,13 +348,21 @@ # import flag means: prepare the source node to be syncable # this is useful if there are e.g. no "ident" or "checksum" columns yet inside a DBI like (row-based) storage - if ($self->{args}->{import}) { + if ($self->{args}->{prepare}) { $self->_prepareNode_MetaProperties('source'); $self->_prepareNode_DummyIdent('source'); #return; #$self->_erase_all($opts->{source_node}); } + # erase flag means: erase the target + #if ($opts->{erase}) { + if ($self->{args}->{erase}) { + # TODO: move this method to the scope of the synchronization core and wrap it around different handlers + #print "ERASE", "\n"; + $self->_erase_all('target'); + } + $self->_syncNodes(); } @@ -216,7 +387,7 @@ $results ||= $self->_getNodeList('source', $filter); } - # get reference to node list from convenient method provided by corehandle + # get reference to node list from convenient method provided by CORE-HANDLE #$results ||= $self->{source}->getListUnfiltered($self->{meta}->{source}->{node}); #$results ||= $self->{meta}->{source}->{storage}->getListUnfiltered($self->{meta}->{source}->{node}); $results ||= $self->_getNodeList('source'); @@ -294,14 +465,18 @@ my $identOK = $self->_resolveNodeIdent('source'); #if (!$identOK && lc $self->{args}->{direction} ne 'import') { if (!$identOK) { - $logger->critical( __PACKAGE__ . "->syncNodes: Can not synchronize: No ident found in source node, maybe try to \"import\" this node first." ); + #print Dumper($self->{meta}->{source}); + $logger->critical( __PACKAGE__ . "->syncNodes: No ident found in source node \"$self->{meta}->{source}->{node}\", try to \"prepare\" this node first?" ); return; } #print "statload", "\n"; #print "ident: ", $self->{node}->{source}->{ident}, "\n"; +#print Dumper($self->{node}); my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident}); + +#print Dumper($self->{node}); # mark node as new either if there's no ident or if stat/load failed if (!$statOK) { @@ -388,6 +563,7 @@ $tc->{attempt_new}++; $self->_doTransferToTarget('insert'); # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?) + #print Dumper($self->{node}); $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1); $self->_readChecksum('target'); @@ -413,11 +589,11 @@ # change ident in source (take from target), if transfer was ok and target is an IdentAuthority # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{storage}->{isIdentAuthority}) { + print "r" if $self->{verbose}; #print Dumper($self->{meta}); #print Dumper($self->{node}); #exit; $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident}); - print "r" if $self->{verbose}; } print ":" if $self->{verbose}; @@ -427,11 +603,11 @@ print "\n" if $self->{verbose}; # build user-message from some stats - my $msg = "stats: $tc"; + my $msg = "statistics: $tc"; if ($tc->{error_per_row}) { $msg .= "\n"; - $msg .= "errors:" . "\n"; + $msg .= "errors from \"error_per_row\":" . "\n"; $msg .= Dumper($tc->{error_per_row}); } @@ -455,30 +631,35 @@ my $item = {}; foreach my $key (keys %$_) { my $val = $_->{$key}; + +#print Dumper($val); + if (ref $val eq 'Set::Object') { #print "========================= SET", "\n"; - #print Dumper($val); +#print Dumper($val); #print Dumper($val->members()); #$val = $val->members(); #$vars->[$count]->{$key} = $val->members() if $val->can("members"); #$item->{$key} = $val->members() if $val->can("members"); $item->{$key} = $val->members(); #print Dumper($vars->[$count]->{$key}); + } else { $item->{$key} = $val; } + } push @data, $item; $count++; } -#print "Dump:", "\n"; -#print Dumper(@data); +#print "Dump:", Dumper(@data), "\n"; $Data::Dumper::Indent = 0; my $result = Dumper(@data); $Data::Dumper::Indent = 2; return $result; + } @@ -572,6 +753,9 @@ for (my $mapidx = 0; $mapidx <= $#childnodes; $mapidx++) { #my $map_right = $self->{args}->{mapping}->{$key}; + $self->{node}->{source}->{propcache} = {}; + $self->{node}->{target}->{propcache} = {}; + # get property name $self->{node}->{source}->{propcache}->{property} = $self->{meta}->{source}->{childnodes}->[$mapidx]; $self->{node}->{target}->{propcache}->{property} = $self->{meta}->{target}->{childnodes}->[$mapidx]; @@ -589,13 +773,39 @@ $self->{node}->{source}->{propcache}->{value} = $self->{node}->{source}->{payload}->{$self->{node}->{source}->{propcache}->{property}}; } #$self->{node}->{map}->{$key} = $value; + + # detect expression + # for transferring deeply nested structures described by expressions + #print "val: $self->{node}->{source}->{propcache}->{value}", "\n"; + if ($self->{node}->{source}->{propcache}->{property} =~ s/^expr://) { + + # create an anonymous sub to act as callback target dispatcher + my $cb_dispatcher = sub { + #print "=============== CALLBACK DISPATCHER", "\n"; + #print "ident: ", $self->{node}->{source}->{ident}, "\n"; + #return $self->{node}->{source}->{ident}; + + }; + + +#print Dumper($self->{node}); + + # build callback map for helper function + #my $cbmap = { $self->{meta}->{source}->{IdentProvider}->{arg} => $cb_dispatcher }; + my $cbmap = {}; + my $value = refexpr2perlref($self->{node}->{source}->{payload}, $self->{node}->{source}->{propcache}->{property}, $cbmap); + $self->{node}->{source}->{propcache}->{value} = $value; + } # encode values dependent on type of underlying storage here - expand cases... my $storage_type = $self->{meta}->{target}->{storage}->{locator}->{type}; if ($storage_type eq 'DBI') { # ...for sql $self->{node}->{source}->{propcache}->{value} = quotesql($self->{node}->{source}->{propcache}->{value}); - } elsif ($storage_type eq 'Tangram') { + } + elsif ($storage_type eq 'Tangram') { + # iso? utf8 already possible? + } elsif ($storage_type eq 'LDAP') { # TODO: encode utf8 here? } @@ -606,35 +816,6 @@ } } -#print "self->{entry}: ", Dumper($self->{node}), "\n"; exit; - - # for transferring deeply nested structures described by expressions - # this currently does not work! - # TODO: re-enable this! - if ($self->{args}->{mappingV2}) { - - # apply mapping from $self->{args}->{mappingV2} to $self->{node}->{map} - foreach my $mapStep (@{$self->{args}->{mappingV2}}) { - - # prepare left/right keys/values - my $left_key = $mapStep->{left}; - my $left_val = _resolveMapStepExpr( $self->{node}->{source}->{payload}, $mapStep->{left} ); - my $right_key = $mapStep->{right}; - my $right_val = ( $mapStep->{right} ); - #print "map: $map_right", "\n"; - - if ($mapStep->{method}) { - if ($mapStep->{method} eq 'v:1') { - $left_val = $left_key; - } - } - - #$self->{node}->{map}->{$key} = $value; - #if ( grep(!/$right_key/, @{$self->{args}->{exclude}}) ) { - $self->{node}->{map}->{$right_key} = $self->{R}->quoteSql($left_val); - #} - } - } # TODO: $logger->dump( ... ); #$logger->debug( "sqlmap:" . "\n" . Dumper($self->{node}->{map}) ); @@ -708,6 +889,8 @@ } } + + #print Dumper($self->{meta}); # DBI speaks SQL if ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'DBI') { @@ -721,6 +904,10 @@ #print $action, "\n"; #$action = "anc"; #print "yai", "\n"; + +#print Dumper($map); +#delete $map->{cs}; + if (lc($action) eq 'insert') { $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_INSERT'); } elsif (lc $action eq 'update') { @@ -728,12 +915,17 @@ $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_UPDATE', $crit); } - #print "sql: ", $sql_main, "\n"; - #exit; +#$sql_main = "UPDATE currencies_csv SET oid='abcdef' WHERE text='Australian Dollar' AND key='AUD';"; +#$sql_main = "UPDATE currencies_csv SET oid='huhu2' WHERE ekey='AUD'"; + +#print "sql: ", $sql_main, "\n"; +#exit; # transfer data my $sqlHandle = $self->{meta}->{$descent}->{storage}->sendCommand($sql_main); +#exit; + # handle errors if ($sqlHandle->err) { #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; } @@ -809,13 +1001,17 @@ hash2object($object, $map); # ... and re-update@orm. +#print Dumper($object); $self->{meta}->{$descent}->{storage}->update($object); # asymmetry: get ident after insert # TODO: # - just do this if it is an IdentAuthority # - use IdentProvider metadata here - $self->{node}->{$descent}->{ident} = $self->{meta}->{$descent}->{storage}->id($object); +#print Dumper($self->{meta}->{$descent}); + my $oid = $self->{meta}->{$descent}->{storage}->id($object); +#print "oid: $oid", "\n"; + $self->{node}->{$descent}->{ident} = $oid; } elsif (lc $action eq 'update') { @@ -912,8 +1108,15 @@ #print "\n", "Attempt to fetch entry implicitely by ident failed: no ident given! This may result in an insert if no write-protection is in the way.", "\n"; return; } + + # patch for DBD::CSV + if ($ident && $ident eq 'Null') { + return; + } - my $result = $self->{meta}->{$descent}->{storage}->sendQuery({ +#print "yai!", "\n"; + + my $query = { node => $self->{meta}->{$descent}->{node}, subnodes => [qw( cs )], criterias => [ @@ -921,26 +1124,53 @@ op => 'eq', val => $ident }, ] - }); + }; + +#print Dumper($query); + + my $result = $self->{meta}->{$descent}->{storage}->sendQuery($query); my $entry = $result->getNextEntry(); + +#print Dumper($entry); +#print "pers: " . $self->{meta}->{$descent}->{storage}->is_persistent($entry), "\n"; +#my $state = $self->{meta}->{$descent}->{storage}->_fetch_object_state($entry, { name => 'TransactionHop' } ); +#print Dumper($state); + my $status = $result->getStatus(); +#print Dumper($status); + # TODO: enhance error handling (store inside tc) #if (!$row) { # print "\n", "row error", "\n"; # next; #} - if (($status && $status->{err}) || !$entry) { - #$logger->critical( __PACKAGE__ . "->_loadNode (ident=\"$ident\") failed" ); - return; - } + + # these checks run before actually loading payload- and meta-data to node-container + + # 1st level - hard error + if ($status && $status->{err}) { + $logger->debug( __PACKAGE__ . "->_statloadNode (ident=\"$ident\") failed - hard error (that's ok): $status->{err}" ); + return; + } + + # 2nd level - logical (empty/notfound) error + if (($status && $status->{empty}) || !$entry) { + $logger->debug( __PACKAGE__ . "->_statloadNode (ident=\"$ident\") failed - logical error (that's ok)" ); + #print "no entry (logical)", "\n"; + return; + } + +#print Dumper($entry); + # was: # $self->{node}->{$descent}->{ident} = $ident; # is: - # TODO: re-resolve ident from entry via metadata "IdentProvider" + # TODO: re-resolve ident from entry via metadata "IdentProvider" here - like elsewhere $self->{node}->{$descent}->{ident} = $ident; $self->{node}->{$descent}->{payload} = $entry; + } return 1; @@ -964,9 +1194,11 @@ $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new, cs => $self->{node}->{target}->{checksum}, }; - #print Dumper($map); - #print Dumper($self->{node}); - #exit; + +#print Dumper($map); +#print Dumper($self->{node}); +#exit; + $self->_modifyNode('source', 'update', $map); } @@ -1048,6 +1280,10 @@ } my $crit = join ' AND ', @crits; print "p" if $self->{verbose}; + +#print Dumper($map); +#print Dumper($crit); + $self->_modifyNode($descent, 'update', $map, $crit); $i++; } @@ -1063,12 +1299,20 @@ # TODO: handle this in an abstract way (wipe out use of 'source' and/or 'target' inside core) sub _otherSide { my $self = shift; - my $side = shift; - return 'source' if $side eq 'target'; - return 'target' if $side eq 'source'; + my $descent = shift; + return 'source' if $descent eq 'target'; + return 'target' if $descent eq 'source'; return ''; } +sub _erase_all { + my $self = shift; + my $descent = shift; + #my $node = shift; + my $node = $self->{meta}->{$descent}->{node}; + $self->{meta}->{$descent}->{storage}->eraseAll($node); +} + =pod