--- nfo/perl/libs/Data/Transfer/Sync.pm 2002/12/01 04:43:25 1.2 +++ nfo/perl/libs/Data/Transfer/Sync.pm 2002/12/23 07:10:59 1.11 @@ -1,4 +1,4 @@ -## $Id: Sync.pm,v 1.2 2002/12/01 04:43:25 joko Exp $ +## $Id: Sync.pm,v 1.11 2002/12/23 07:10:59 joko Exp $ ## ## Copyright (c) 2002 Andreas Motl ## @@ -6,6 +6,37 @@ ## ## ---------------------------------------------------------------------------------------- ## $Log: Sync.pm,v $ +## Revision 1.11 2002/12/23 07:10:59 joko +## + using MD5 for checksum generation again - the 32-bit integer hash from DBI seems to be too lazy +## +## Revision 1.10 2002/12/19 01:07:16 joko +## + fixed output done via $logger +## +## Revision 1.9 2002/12/16 07:02:34 jonen +## + added comment +## +## Revision 1.8 2002/12/15 02:03:09 joko +## + fixed logging-messages +## + additional metadata-checks +## +## Revision 1.7 2002/12/13 21:49:34 joko +## + sub configure +## + sub checkOptions +## +## Revision 1.6 2002/12/06 04:49:10 jonen +## + disabled output-puffer here +## +## Revision 1.5 2002/12/05 08:06:05 joko +## + bugfix with determining empty fields (Null) with DBD::CSV +## + debugging +## + updated comments +## +## Revision 1.4 2002/12/03 15:54:07 joko +## + {import}-flag is now {prepare}-flag +## +## Revision 1.3 2002/12/01 22:26:59 joko +## + minor cosmetics for logging +## ## Revision 1.2 2002/12/01 04:43:25 joko ## + mapping deatil entries may now be either an ARRAY or a HASH ## + erase flag is used now (for export-operations) @@ -28,6 +59,8 @@ use warnings; use Data::Dumper; +#use Hash::Merge qw( merge ); + use misc::HashExt; use libp qw( md5_base64 ); use libdb qw( quotesql hash2Sql ); @@ -37,20 +70,37 @@ # get logger instance my $logger = Log::Dispatch::Config->instance; +$| = 1; sub new { my $invocant = shift; my $class = ref($invocant) || $invocant; - my $self = { @_ }; + my $self = {}; $logger->debug( __PACKAGE__ . "->new(@_)" ); bless $self, $class; - $self->_init(); + $self->configure(@_); return $self; } +sub configure { + my $self = shift; + my @args = @_; + if (!isEmpty(\@args)) { + my %properties = @_; + # merge args to properties + map { $self->{$_} = $properties{$_}; } keys %properties; + $self->_init(); + } else { + #print "no args!", "\n"; + } + #print Dumper($self); +} + sub _init { my $self = shift; + + $self->{configured} = 1; # build new container if necessary $self->{container} = Data::Storage::Container->new() if !$self->{container}; @@ -69,12 +119,109 @@ } +sub prepareOptions { + + my $self = shift; + my $opts = shift; + +#print Dumper($opts); + + $opts->{mode} ||= ''; + $opts->{erase} ||= 0; + #$opts->{import} ||= 0; + + $logger->notice( __PACKAGE__ . "->prepareOptions( source_node $opts->{source_node} mode $opts->{mode} erase $opts->{erase} prepare $opts->{prepare} )"); + + if (!$opts->{mapping} || !$opts->{mapping_module}) { + $logger->warning( __PACKAGE__ . "->prepareOptions: No mapping supplied - please check key 'mappings' in BizWorks/Config.pm"); + } + + my $evstring = "use $opts->{mapping_module};"; + eval($evstring); + if ($@) { + $logger->warning( __PACKAGE__ . "->prepareOptions: error while trying to access mapping - $@"); + return; + } + + # resolve mapping metadata (returned from sub) + my $mapObject = $opts->{mapping_module}->new(); + #print Dumper($map); + my $source_node_name = $opts->{source_node}; + # check if mapping for certain node is contained in mapping object + if (!$mapObject->can($source_node_name)) { + $logger->warning( __PACKAGE__ . "->prepareOptions: Can't access mapping for node \"$source_node_name\" - please check $opts->{mapping_module}."); + return; + } + my $map = $mapObject->$source_node_name; + + # remove asymmetries from $map (patch keys) + $map->{source_node} = $map->{source}; delete $map->{source}; + $map->{target_node} = $map->{target}; delete $map->{target}; + $map->{mapping} = $map->{details}; delete $map->{details}; + $map->{direction} = $map->{mode}; delete $map->{mode}; + + # defaults (mostly for backward-compatibility) + $map->{source_node} ||= $source_node_name; + $map->{source_ident} ||= 'storage_method:id'; + $map->{target_ident} ||= 'property:oid'; + $map->{direction} ||= $opts->{mode}; # | PUSH | PULL | FULL + $map->{method} ||= 'checksum'; # | timestamp + $map->{source_exclude} ||= [qw( cs )]; + + # merge map to opts + map { $opts->{$_} = $map->{$_}; } keys %$map; + +#print Dumper($opts); + + # TODO: move this to checkOptions... + + # check - do we have a target? + if (!$opts->{target_node}) { + $logger->warning( __PACKAGE__ . "->prepareOptions: No target given - please check metadata declaration."); + return; + } + + + #return $opts; + return 1; + +} + + +sub checkOptions { + my $self = shift; + my $opts = shift; + + my $result = 1; + + # check - do we have a target node? + if (!$opts->{target_node}) { + $logger->warning( __PACKAGE__ . "->checkOptions: Error while resolving resource metadata - no 'target node' could be determined."); + $result = 0; + } + + # check - do we have a mapping? + if (!$opts->{mapping} && !$opts->{mapping_module}) { + $logger->warning( __PACKAGE__ . "->checkOptions: Error while resolving resource metadata - no 'mapping' could be determined."); + $result = 0; + } + + return $result; + +} + + # TODO: some feature to show off the progress of synchronization (cur/max * 100) sub syncNodes { my $self = shift; my $args = shift; + if (!$self->{configured}) { + $logger->critical( __PACKAGE__ . "->syncNodes: Synchronization object is not configured/initialized correctly." ); + return; + } + # remember arguments through the whole processing $self->{args} = $args; @@ -97,7 +244,7 @@ } # decompose identifiers for each partner - # TODO: take this list from already established/given metadata + # TODO: refactor!!! take this list from already established/given metadata foreach ('source', 'target') { # get/set metadata for further processing @@ -152,6 +299,8 @@ #print "iiiiisprov: ", Dumper($self->{meta}->{$_}->{storage}), "\n"; } +#print Dumper($self->{meta}); + $logger->info( __PACKAGE__ . "->syncNodes: source=$self->{meta}->{source}->{dbkey}/$self->{meta}->{source}->{node} $direction_arrow target=$self->{meta}->{target}->{dbkey}/$self->{meta}->{target}->{node}" ); # build mapping @@ -179,14 +328,37 @@ } +#print Dumper($self->{meta}); + # check partners/nodes: does partner exist / is node available? foreach my $partner (keys %{$self->{meta}}) { - next if $self->{meta}->{$partner}->{storage}->{locator}->{type} eq 'DBI'; # for DBD::CSV - re-enable for others + + # 1. check partners & storages + if (!$self->{meta}->{$partner}) { + $logger->critical( __PACKAGE__ . "->syncNodes: Could not find partner '$partner' in configuration metadata." ); + return; + } + + my $dbkey = $self->{meta}->{$partner}->{dbkey}; + + if (!$self->{meta}->{$partner}->{storage}) { + $logger->critical( __PACKAGE__ . "->syncNodes: Could not access storage of partner '$partner' (named '$dbkey'), looks like a configuration-error." ); + return; + } + + # TODO: + # 2. check if partners (and nodes?) are actually available.... + # eventually pre-check mode of access-attempt (read/write) here to provide an "early-croak" if possible + + # 3. check nodes + next if $self->{meta}->{$partner}->{storage}->{locator}->{type} eq 'DBI'; # HACK for DBD::CSV - re-enable for others + # get node-name my $node = $self->{meta}->{$partner}->{node}; if (!$self->{meta}->{$partner}->{storage}->existsChildNode($node)) { - $logger->critical( __PACKAGE__ . "->syncNodes: Could not reach \"$node\" at \"$partner\"." ); + $logger->critical( __PACKAGE__ . "->syncNodes: Could not reach node \"$node\" at partner \"$partner\"." ); return; } + } # TODO: @@ -210,7 +382,7 @@ # import flag means: prepare the source node to be syncable # this is useful if there are e.g. no "ident" or "checksum" columns yet inside a DBI like (row-based) storage - if ($self->{args}->{import}) { + if ($self->{args}->{prepare}) { $self->_prepareNode_MetaProperties('source'); $self->_prepareNode_DummyIdent('source'); #return; @@ -249,7 +421,7 @@ $results ||= $self->_getNodeList('source', $filter); } - # get reference to node list from convenient method provided by corehandle + # get reference to node list from convenient method provided by CORE-HANDLE #$results ||= $self->{source}->getListUnfiltered($self->{meta}->{source}->{node}); #$results ||= $self->{meta}->{source}->{storage}->getListUnfiltered($self->{meta}->{source}->{node}); $results ||= $self->_getNodeList('source'); @@ -327,14 +499,18 @@ my $identOK = $self->_resolveNodeIdent('source'); #if (!$identOK && lc $self->{args}->{direction} ne 'import') { if (!$identOK) { - $logger->critical( __PACKAGE__ . "->syncNodes: Can not synchronize: No ident found in source node, maybe try to \"import\" this node first." ); + #print Dumper($self->{meta}->{source}); + $logger->critical( __PACKAGE__ . "->syncNodes: No ident found in source node \"$self->{meta}->{source}->{node}\", try to \"prepare\" this node first?" ); return; } #print "statload", "\n"; #print "ident: ", $self->{node}->{source}->{ident}, "\n"; +#print Dumper($self->{node}); my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident}); + +#print Dumper($self->{node}); # mark node as new either if there's no ident or if stat/load failed if (!$statOK) { @@ -421,6 +597,7 @@ $tc->{attempt_new}++; $self->_doTransferToTarget('insert'); # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?) + #print Dumper($self->{node}); $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1); $self->_readChecksum('target'); @@ -446,11 +623,11 @@ # change ident in source (take from target), if transfer was ok and target is an IdentAuthority # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{storage}->{isIdentAuthority}) { + print "r" if $self->{verbose}; #print Dumper($self->{meta}); #print Dumper($self->{node}); #exit; $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident}); - print "r" if $self->{verbose}; } print ":" if $self->{verbose}; @@ -460,7 +637,7 @@ print "\n" if $self->{verbose}; # build user-message from some stats - my $msg = "stats: $tc"; + my $msg = "statistics: $tc"; if ($tc->{error_per_row}) { $msg .= "\n"; @@ -477,6 +654,7 @@ } +# refactor this as some core-function to do a generic dump resolving data-encapsulations of e.g. Set::Object sub _dumpCompact { my $self = shift; @@ -488,30 +666,35 @@ my $item = {}; foreach my $key (keys %$_) { my $val = $_->{$key}; + +#print Dumper($val); + if (ref $val eq 'Set::Object') { #print "========================= SET", "\n"; - #print Dumper($val); +#print Dumper($val); #print Dumper($val->members()); #$val = $val->members(); #$vars->[$count]->{$key} = $val->members() if $val->can("members"); #$item->{$key} = $val->members() if $val->can("members"); $item->{$key} = $val->members(); #print Dumper($vars->[$count]->{$key}); + } else { $item->{$key} = $val; } + } push @data, $item; $count++; } -#print "Dump:", "\n"; -#print Dumper(@data); +#print "Dump:", Dumper(@data), "\n"; $Data::Dumper::Indent = 0; my $result = Dumper(@data); $Data::Dumper::Indent = 2; return $result; + } @@ -535,10 +718,15 @@ #$logger->dump( __PACKAGE__ . ": " . $dump ); # calculate checksum from dump + # note: the 32-bit integer hash from DBI seems + # to generate duplicates with small payloads already in ranges of hundreds of items/rows!!! + # try to avoid to use it or try to use it only for payloads greater than, hmmm, let's say 30 chars? + # (we had about 15 chars average per item (row)) + # md5-based fingerprint, base64 encoded (from Digest::MD5) - #my $checksum_cur = md5_base64($objdump) . '=='; + $self->{node}->{$descent}->{checksum} = md5_base64($dump) . '=='; # 32-bit integer "hash" value (maybe faster?) (from DBI) - $self->{node}->{$descent}->{checksum} = DBI::hash($dump, 1); + #$self->{node}->{$descent}->{checksum} = DBI::hash($dump, 1); # signal good return 1; @@ -741,6 +929,8 @@ } } + + #print Dumper($self->{meta}); # DBI speaks SQL if ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'DBI') { @@ -754,6 +944,10 @@ #print $action, "\n"; #$action = "anc"; #print "yai", "\n"; + +#print Dumper($map); +#delete $map->{cs}; + if (lc($action) eq 'insert') { $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_INSERT'); } elsif (lc $action eq 'update') { @@ -761,12 +955,17 @@ $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_UPDATE', $crit); } - #print "sql: ", $sql_main, "\n"; - #exit; +#$sql_main = "UPDATE currencies_csv SET oid='abcdef' WHERE text='Australian Dollar' AND key='AUD';"; +#$sql_main = "UPDATE currencies_csv SET oid='huhu2' WHERE ekey='AUD'"; + +#print "sql: ", $sql_main, "\n"; +#exit; # transfer data my $sqlHandle = $self->{meta}->{$descent}->{storage}->sendCommand($sql_main); +#exit; + # handle errors if ($sqlHandle->err) { #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; } @@ -842,13 +1041,17 @@ hash2object($object, $map); # ... and re-update@orm. +#print Dumper($object); $self->{meta}->{$descent}->{storage}->update($object); # asymmetry: get ident after insert # TODO: # - just do this if it is an IdentAuthority # - use IdentProvider metadata here - $self->{node}->{$descent}->{ident} = $self->{meta}->{$descent}->{storage}->id($object); +#print Dumper($self->{meta}->{$descent}); + my $oid = $self->{meta}->{$descent}->{storage}->id($object); +#print "oid: $oid", "\n"; + $self->{node}->{$descent}->{ident} = $oid; } elsif (lc $action eq 'update') { @@ -945,8 +1148,15 @@ #print "\n", "Attempt to fetch entry implicitely by ident failed: no ident given! This may result in an insert if no write-protection is in the way.", "\n"; return; } + + # patch for DBD::CSV + if ($ident && $ident eq 'Null') { + return; + } - my $result = $self->{meta}->{$descent}->{storage}->sendQuery({ +#print "yai!", "\n"; + + my $query = { node => $self->{meta}->{$descent}->{node}, subnodes => [qw( cs )], criterias => [ @@ -954,26 +1164,53 @@ op => 'eq', val => $ident }, ] - }); + }; + +#print Dumper($query); + + my $result = $self->{meta}->{$descent}->{storage}->sendQuery($query); my $entry = $result->getNextEntry(); + +#print Dumper($entry); +#print "pers: " . $self->{meta}->{$descent}->{storage}->is_persistent($entry), "\n"; +#my $state = $self->{meta}->{$descent}->{storage}->_fetch_object_state($entry, { name => 'TransactionHop' } ); +#print Dumper($state); + my $status = $result->getStatus(); +#print Dumper($status); + # TODO: enhance error handling (store inside tc) #if (!$row) { # print "\n", "row error", "\n"; # next; #} - if (($status && $status->{err}) || !$entry) { - #$logger->critical( __PACKAGE__ . "->_loadNode (ident=\"$ident\") failed" ); - return; - } + + # these checks run before actually loading payload- and meta-data to node-container + + # 1st level - hard error + if ($status && $status->{err}) { + $logger->debug( __PACKAGE__ . "->_statloadNode (ident=\"$ident\") failed - hard error (that's ok): $status->{err}" ); + return; + } + + # 2nd level - logical (empty/notfound) error + if (($status && $status->{empty}) || !$entry) { + $logger->debug( __PACKAGE__ . "->_statloadNode (ident=\"$ident\") failed - logical error (that's ok)" ); + #print "no entry (logical)", "\n"; + return; + } + +#print Dumper($entry); + # was: # $self->{node}->{$descent}->{ident} = $ident; # is: - # TODO: re-resolve ident from entry via metadata "IdentProvider" + # TODO: re-resolve ident from entry via metadata "IdentProvider" here - like elsewhere $self->{node}->{$descent}->{ident} = $ident; $self->{node}->{$descent}->{payload} = $entry; + } return 1; @@ -997,9 +1234,11 @@ $self->{meta}->{source}->{IdentProvider}->{arg} => $ident_new, cs => $self->{node}->{target}->{checksum}, }; - #print Dumper($map); - #print Dumper($self->{node}); - #exit; + +#print Dumper($map); +#print Dumper($self->{node}); +#exit; + $self->_modifyNode('source', 'update', $map); } @@ -1081,6 +1320,10 @@ } my $crit = join ' AND ', @crits; print "p" if $self->{verbose}; + +#print Dumper($map); +#print Dumper($crit); + $self->_modifyNode($descent, 'update', $map, $crit); $i++; }