--- nfo/perl/libs/Data/Transfer/Sync/Core.pm 2003/01/19 01:23:04 1.1 +++ nfo/perl/libs/Data/Transfer/Sync/Core.pm 2004/06/19 01:45:08 1.12 @@ -1,11 +1,51 @@ -## $Id: Core.pm,v 1.1 2003/01/19 01:23:04 joko Exp $ +## ------------------------------------------------------------------------- +## +## $Id: Core.pm,v 1.12 2004/06/19 01:45:08 joko Exp $ ## ## Copyright (c) 2002 Andreas Motl ## ## See COPYRIGHT section in pod text below for usage and distribution rights. ## -## ---------------------------------------------------------------------------------------- +## ------------------------------------------------------------------------- ## $Log: Core.pm,v $ +## Revision 1.12 2004/06/19 01:45:08 joko +## introduced "local checksum"-mechanism +## moved _dumpCompact to ::Compare::Checksum +## +## Revision 1.11 2004/05/11 20:03:48 jonen +## bugfix[joko] related to Attribute Map +## +## Revision 1.10 2003/06/25 23:03:57 joko +## no debugging +## +## Revision 1.9 2003/05/13 08:17:52 joko +## buildAttributeMap now propagates error +## +## Revision 1.8 2003/03/27 15:31:15 joko +## fixes to modules regarding new namespace(s) below Data::Mungle::* +## +## Revision 1.7 2003/02/21 08:01:11 joko +## debugging, logging +## renamed module +## +## Revision 1.6 2003/02/14 14:03:49 joko +## + logging, debugging +## - refactored code to sister module +## +## Revision 1.5 2003/02/11 05:30:47 joko +## + minor fixes and some debugging mud +## +## Revision 1.4 2003/02/09 05:01:10 joko +## + major structure changes +## - refactored code to sister modules +## +## Revision 1.3 2003/01/20 17:01:14 joko +## + cosmetics and debugging +## + probably refactored code to new plugin-modules 'Metadata.pm' and/or 'StorageInterface.pm' (guess it was the last one...) +## +## Revision 1.2 2003/01/19 02:05:42 joko +## - removed pod-documentation: now in Data/Transfer/Sync.pod +## ## Revision 1.1 2003/01/19 01:23:04 joko ## + new from Data/Transfer/Sync.pm ## @@ -41,7 +81,7 @@ ## + minor cosmetics for logging ## ## Revision 1.2 2002/12/01 04:43:25 joko -## + mapping deatil entries may now be either an ARRAY or a HASH +## + mapping detail entries may now be either an ARRAY or a HASH ## + erase flag is used now (for export-operations) ## + expressions to refer to values inside deep nested structures ## - removed old mappingV2-code @@ -53,7 +93,7 @@ ## ## Revision 1.1 2002/10/10 03:44:21 cvsjoko ## + new -## ---------------------------------------------------------------------------------------- +## ------------------------------------------------------------------------- package Data::Transfer::Sync::Core; @@ -68,13 +108,13 @@ use Data::Dumper; -use misc::HashExt; -use libp qw( md5_base64 ); -use libdb qw( quotesql hash2Sql ); -use Data::Transform::Deep qw( hash2object refexpr2perlref ); -use Data::Compare::Struct qw( getDifference isEmpty ); +#use misc::HashExt; +use Hash::Serializer; +use Data::Mungle::Compare::Struct qw( getDifference isEmpty ); +use Data::Mungle::Transform::Deep qw( deep_copy expand ); use Data::Storage::Container; use DesignPattern::Object; +use shortcuts::database qw( quotesql ); # get logger instance my $logger = Log::Dispatch::Config->instance; @@ -105,12 +145,18 @@ $self->{container}->addStorage($_, $self->{storages}->{$_}); } + # trace + #print Dumper($self); + #exit; + return 1; } sub _initV1 { my $self = shift; + $logger->debug( __PACKAGE__ . "->_initV1" ); + die("this should not be reached!"); # tag storages with id-authority and checksum-provider information # TODO: better store tag inside metadata to hold bits together! map { $self->{container}->{storage}->{$_}->{isIdentAuthority} = 1 } @{$self->{id_authorities}}; @@ -119,146 +165,18 @@ } -sub _preCheckOptions { - - my $self = shift; - my $opts = shift; - -#print Dumper($opts); -#exit; - - # the type of the to-be-synced item - if (!$opts->{source}->{nodeType}) { - $logger->error( __PACKAGE__ . "->_preCheckOptions failed: Please specify \"source-type\"."); - return; - } - # the name of the (container-) node the items are listed in - if (!$opts->{source}->{nodeName}) { - $logger->error( __PACKAGE__ . "->_preCheckOptions failed: Please specify \"source-node\"."); - return; - } - - # a "map"-declaration which module to use for mapping- and/or lookup-purposes - if (!$opts->{map}) { - $logger->warning( __PACKAGE__ . "->_preCheckOptions: No mapping supplied - please check key 'map|mappings' in global configuration or specify additional argument '--mapping-module'."); - return; - } - if (!$opts->{map}->{moduleName}) { - $logger->warning( __PACKAGE__ . "->_preCheckOptions: Currently only perl-modules can provide mappings: Please specify one with '--mapping-module'."); - return; - } - - return 1; - -} - - - - - - - - - -sub _buildFieldmappingV1 { - my $self = shift; - - # build mapping - # incoming: and Array of node map entries (Array or Hash) - e.g. - # [ 'source:item_name' => 'target:class_val' ] - # { source => 'event->startDateTime', target => 'begindate' } - foreach (@{$self->{args}->{mapping}}) { - if (ref $_ eq 'ARRAY') { - my @entry1 = split(':', $_->[0]); - my @entry2 = split(':', $_->[1]); - my $descent = []; - my $node = []; - $descent->[0] = $entry1[0]; - $descent->[1] = $entry2[0]; - $node->[0] = $entry1[1]; - $node->[1] = $entry2[1]; - push @{$self->{meta}->{$descent->[0]}->{childnodes}}, $node->[0]; - push @{$self->{meta}->{$descent->[1]}->{childnodes}}, $node->[1]; - } elsif (ref $_ eq 'HASH') { - foreach my $entry_key (keys %$_) { - my $entry_val = $_->{$entry_key}; - push @{$self->{meta}->{$entry_key}->{childnodes}}, $entry_val; - } - } - - } - -} - -sub _buildMetadataV1 { - my $self = shift; - - # decompose identifiers for each partner - # TODO: refactor!!! take this list from already established/given metadata - foreach ('source', 'target') { - - # get/set metadata for further processing - - # Partner and Node (e.g.: "L:Country" or "R:countries.csv") - if (my $item = $self->{args}->{$_}) { - my @item = split(':', $item); - $self->{meta}->{$_}->{dbkey} = $item[0]; - $self->{meta}->{$_}->{node} = $item[1]; - } - - # Filter - if (my $item_filter = $self->{args}->{$_ . '_filter'}) { - $self->{meta}->{$_}->{filter} = $item_filter; - } - - # IdentProvider - if (my $item_ident = $self->{args}->{$_ . '_ident'}) { - my @item_ident = split(':', $item_ident); - $self->{meta}->{$_}->{IdentProvider} = { method => $item_ident[0], arg => $item_ident[1] }; - } - -#print Dumper($self->{meta}); - - # TODO: ChecksumProvider - - # exclude properties/subnodes - if (my $item_exclude = $self->{args}->{$_ . '_exclude'}) { - $self->{meta}->{$_}->{subnodes_exclude} = $item_exclude; - } - - # TypeProvider - if (my $item_type = $self->{args}->{$_ . '_type'}) { - my @item_type = split(':', $item_type); - $self->{meta}->{$_}->{TypeProvider} = { method => $item_type[0], arg => $item_type[1] }; - } - - # Callbacks - writers (will be triggered _before_ writing to target) - if (my $item_writers = $self->{args}->{$_ . '_callbacks_write'}) { - my $descent = $_; # this is important since the following code inside the map wants to use its own context variables - map { $self->{meta}->{$descent}->{Callback}->{write}->{$_}++; } @$item_writers; - } - - # Callbacks - readers (will be triggered _after_ reading from source) - if (my $item_readers = $self->{args}->{$_ . '_callbacks_read'}) { - my $descent = $_; - map { $self->{meta}->{$descent}->{Callback}->{read}->{$_}++; } @$item_readers; - } - - # resolve storage objects - #$self->{$_} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}}; - # relink references to metainfo - $self->{meta}->{$_}->{storage} = $self->{container}->{storage}->{$self->{meta}->{$_}->{dbkey}}; - #print "iiiiisprov: ", Dumper($self->{meta}->{$_}->{storage}), "\n"; - } - -} - # TODO: abstract the hardwired use of "source" and "target" in here somehow - hmmmm....... /(="§/%??? -sub _syncNodes { +sub _run { my $self = shift; - my $tc = OneLineDumpHash->new( {} ); + #print "verbose: ", $self->{verbose}, "\n"; + $self->{verbose} = 1; + + $logger->debug( __PACKAGE__ . "->_run" ); + + # for statistics + my $tc = Hash::Serializer->new( {} ); my $results; # set of objects is already in $self->{args} @@ -273,36 +191,53 @@ } # get reference to node list from convenient method provided by CORE-HANDLE - #$results ||= $self->{source}->getListUnfiltered($self->{meta}->{source}->{node}); - #$results ||= $self->{meta}->{source}->{storage}->getListUnfiltered($self->{meta}->{source}->{node}); $results ||= $self->_getNodeList('source'); # checkpoint: do we actually have a list to iterate through? if (!$results || !@{$results}) { - $logger->notice( __PACKAGE__ . "->syncNodes: No nodes to synchronize." ); + $logger->notice( __PACKAGE__ . "->_run: No nodes to synchronize." ); + return; + } + + #print Dumper(@$results); + #exit; + + # check if we actually *have* a synchronization method + if (!$self->{options}->{metadata}->{syncMethod}) { + $logger->critical( __PACKAGE__ . "->_run: No synchronization method (checksum|timestamp) specified" ); return; } + # dereference my @results = @{$results}; + #print Dumper(@results); # iterate through set foreach my $source_node_real (@results) { + print ":" if $self->{verbose}; + + #print Dumper($source_node_real); + $tc->{total}++; -#print "======================== iter", "\n"; + #print "=" x 80, "\n"; # clone object (in case we have to modify it here) # TODO: # - is a "deep_copy" needed here if occouring modifications take place? # - puuhhhh, i guess a deep_copy would destroy tangram mechanisms? # - after all, just take care for now that this object doesn't get updated! + # - so, just use its reference for now - if some cloning is needed in future, do this here! my $source_node = $source_node_real; + #my $source_node = expand($source_node_real); # modify entry - handle new style callbacks (the readers) -#print Dumper($source_node); -#exit; + + # trace + #print Dumper($source_node); + #exit; my $descent = 'source'; @@ -310,22 +245,32 @@ my $map_callbacks = {}; if (my $callbacks = $self->{meta}->{$descent}->{Callback}) { + # trace + #print Dumper($callbacks); + #exit; + my $error = 0; foreach my $node (keys %{$callbacks->{read}}) { + #print "cb_node: $node", "\n"; + my $object = $source_node; my $value; # = $source_node->{$node}; + # trace + #print Dumper($self->{options}); + # ------------ half-redundant: make $self->callCallback($object, $value, $opts) - my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read'; + #my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_read'; + my $perl_callback = $self->{meta}->{$descent}->{nodeType} . '::' . $node . '_read'; my $evalstring = 'return ' . $perl_callback . '( { object => $object, property => $node, value => $value, storage => $self->{meta}->{$descent}->{storage} } );'; #print $evalstring, "\n"; exit; my $cb_result = eval($evalstring); if ($@) { - die $@; $error = 1; - print $@, "\n"; + $logger->error( __PACKAGE__ . "->_run: $@" ); + next; } # ------------ half-redundant: make $self->callCallback($object, $value, $opts) @@ -335,7 +280,8 @@ } -#print Dumper($source_node); + # trace + #print Dumper($source_node); # exclude defined fields (simply delete from object) map { delete $source_node->{$_} } @{$self->{meta}->{source}->{subnodes_exclude}}; @@ -344,57 +290,49 @@ $self->{node} = {}; $self->{node}->{source}->{payload} = $source_node; -#print "res - ident", "\n"; + # trace + #print Dumper($self->{node}); + #exit; # determine ident of entry my $identOK = $self->_resolveNodeIdent('source'); #if (!$identOK && lc $self->{args}->{direction} ne 'import') { if (!$identOK) { #print Dumper($self->{meta}->{source}); - $logger->critical( __PACKAGE__ . "->syncNodes: No ident found in source node \"$self->{meta}->{source}->{node}\", try to \"prepare\" this node first?" ); + $logger->critical( __PACKAGE__ . "->_run: No ident found in source node ( nodeName='$self->{meta}->{source}->{nodeName}', nodeType='$self->{meta}->{source}->{nodeType}') try to \"prepare\" this node first?" ); return; } -#print "statload", "\n"; -#print "ident: ", $self->{node}->{source}->{ident}, "\n"; -#print Dumper($self->{node}); - + #print "l" if $self->{verbose}; my $statOK = $self->_statloadNode('target', $self->{node}->{source}->{ident}); -#print Dumper($self->{node}); - # mark node as new either if there's no ident or if stat/load failed if (!$statOK) { $self->{node}->{status}->{new} = 1; print "n" if $self->{verbose}; } -#print "checksum", "\n"; - # determine status of entry by synchronization method - if ( (lc $self->{args}->{method} eq 'checksum') ) { + if ( lc $self->{options}->{metadata}->{syncMethod} eq 'checksum' ) { #if ( $statOK && (lc $self->{args}->{method} eq 'checksum') ) { #if ( !$self->{node}->{status}->{new} && (lc $self->{args}->{method} eq 'checksum') ) { - # TODO: - # is this really worth a "critical"??? - # no - it should just be a debug appendix i believe - -#print "readcs", "\n"; - - # calculate checksum of source node + # calculate local checksum of source node + $self->handleLocalChecksum('source'); + + # calculate checksum of source node #$self->_calcChecksum('source'); - if (!$self->_readChecksum('source')) { - $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" ); + if (!$self->readChecksum('source')) { + $logger->warning( __PACKAGE__ . "->_run: Could not find \"source\" entry with ident=\"$self->{node}->{source}->{ident}\"" ); $tc->{skip}++; print "s" if $self->{verbose}; next; } # get checksum from synchronization target - $self->_readChecksum('target'); - #if (!$self->_readChecksum('target')) { - # $logger->critical( __PACKAGE__ . "->_readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" ); + $self->readChecksum('target'); + #if (!$self->readChecksum('target')) { + # $logger->critical( __PACKAGE__ . "->readChecksum: Could not find \"target\" entry with ident=\"$self->{node}->{source}->{ident}\"" ); # next; #} @@ -407,6 +345,14 @@ # determine if entry is "new" or "dirty" # after all, this seems to be the point where the hammer falls..... print "c" if $self->{verbose}; + + # trace + #print Dumper($self->{node}); + #exit; + #print "LOCAL: ", $self->{node}->{source}->{checksum_local_storage}, " <-> ", $self->{node}->{source}->{checksum_local_calculated}, "\n"; + #print "REMOTE: ", $self->{node}->{source}->{checksum}, " <-> ", $self->{node}->{target}->{checksum}, "\n"; + + # calculate new/dirty status $self->{node}->{status}->{new} = !$self->{node}->{target}->{checksum}; if (!$self->{node}->{status}->{new}) { $self->{node}->{status}->{dirty} = @@ -416,6 +362,24 @@ $self->{args}->{force}; } + # new 2004-06-17: also check if local checksum is inconsistent + if ($self->{node}->{source}->{checksum_local_storage} ne $self->{node}->{source}->{checksum_local_calculated}) { + $self->{node}->{status}->{dirty_local} = 1; + $self->{node}->{status}->{dirty} = 1; + } + + } else { + $logger->warning( __PACKAGE__ . "->_run: Synchronization method '$self->{options}->{metadata}->{syncMethod}' is not implemented" ); + $tc->{skip}++; + print "s" if $self->{verbose}; + next; + } + + # new 2004-06-17: also update local checksum + if ($self->{node}->{status}->{dirty_local}) { + $tc->{locally_modified}++; + print "[lm]" if $self->{verbose}; + $self->_doModify_LocalChecksum('source'); } # first reaction on entry-status: continue with next entry if the current is already "in sync" @@ -424,25 +388,37 @@ next; } - # build map to actually transfer the data from source to target - $self->_buildMap(); - + #print Dumper($self->{node}->{source}); -#print Dumper($self->{node}); exit; + # build map to actually transfer the data from source to target + if (!$self->buildAttributeMap()) { + #$logger->warning( __PACKAGE__ . "->_run: Attribute Map could not be created. Will not insert or modify node."); + push( @{$tc->{error_per_row}}, "Attribute Map could not be created. Will not insert or modify node $self->{node}->{source}->{ident}."); + #push( @{$tc->{error_per_row}}, "Attribute Map could not be created. Will not insert or modify node " . Dumper($self->{node}->{source}) . "."); + $tc->{error}++; + print "e" if $self->{verbose}; + next; + } -#print "attempt", "\n"; + # trace + #print Dumper($self->{node}); exit; + #print "attempt", "\n"; # additional (new) checks for feature "write-protection" if ($self->{meta}->{target}->{storage}->{isWriteProtected}) { $tc->{attempt_transfer}++; print "\n" if $self->{verbose}; - $logger->notice( __PACKAGE__ . "->syncNodes: Target is write-protected. Will not insert or modify node. " . + $logger->notice( __PACKAGE__ . "->_run: Target is write-protected. Will not insert or modify node. " . "(Ident: $self->{node}->{source}->{ident} " . "Dump:\n" . Dumper($self->{node}->{source}->{payload}) . ")" ); print "\n" if $self->{verbose}; $tc->{skip}++; next; } + # trace + #print Dumper($self); + #exit; + # transfer contents of map to target if ($self->{node}->{status}->{new}) { $tc->{attempt_new}++; @@ -450,14 +426,14 @@ # asymmetry: refetch node from target to re-calculate new ident and checksum (TODO: is IdentAuthority of relevance here?) #print Dumper($self->{node}); $self->_statloadNode('target', $self->{node}->{target}->{ident}, 1); - $self->_readChecksum('target'); + $self->readChecksum('target'); } elsif ($self->{node}->{status}->{dirty}) { $tc->{attempt_modify}++; # asymmetry: get ident before updating (TODO: is IdentAuthority of relevance here?) $self->{node}->{target}->{ident} = $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}}; $self->_doTransferToTarget('update'); - $self->_readChecksum('target'); + $self->readChecksum('target'); } if ($self->{node}->{status}->{ok}) { @@ -471,17 +447,23 @@ print "e" if $self->{verbose}; } + # trace + #print Dumper($self); + #exit; + # change ident in source (take from target), if transfer was ok and target is an IdentAuthority # this is (for now) called a "retransmit" indicated by a "r"-character when verbosing - if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{storage}->{isIdentAuthority}) { + #if ($self->{node}->{status}->{ok} && $self->{options}->{target}->{storage}->{idAuthority}) { + if ($self->{node}->{status}->{ok} && $self->{meta}->{target}->{isIdentAuthority}) { print "r" if $self->{verbose}; #print Dumper($self->{meta}); #print Dumper($self->{node}); #exit; $self->_doModifySource_IdentChecksum($self->{node}->{target}->{ident}); } - - print ":" if $self->{verbose}; + + #print "UNLOAD", "\n"; + #$self->{meta}->{source}->{storage}->unload( $self->{node}->{source}->{payload} ); } @@ -498,582 +480,27 @@ # todo!!! #sysevent( { usermsg => $msg, level => $level }, $taskEvent ); - $logger->info( __PACKAGE__ . "->syncNodes: $msg" ); + #$logger->info( __PACKAGE__ . "->_run: $msg" ); + $logger->info($msg . "\n"); return $tc; } -# refactor this as some core-function to do a generic dump resolving data-encapsulations of e.g. Set::Object -sub _dumpCompact { - my $self = shift; - - #my $vars = \@_; - my @data = (); - - my $count = 0; - foreach (@_) { - my $item = {}; - foreach my $key (keys %$_) { - my $val = $_->{$key}; - -#print Dumper($val); - - if (ref $val eq 'Set::Object') { - #print "========================= SET", "\n"; -#print Dumper($val); - #print Dumper($val->members()); - #$val = $val->members(); - #$vars->[$count]->{$key} = $val->members() if $val->can("members"); - #$item->{$key} = $val->members() if $val->can("members"); - $item->{$key} = $val->members(); - #print Dumper($vars->[$count]->{$key}); - - } else { - $item->{$key} = $val; - } - - } - push @data, $item; - $count++; - } - -#print "Dump:", Dumper(@data), "\n"; - - $Data::Dumper::Indent = 0; - my $result = Dumper(@data); - $Data::Dumper::Indent = 2; - return $result; - -} - - -sub _calcChecksum { - - my $self = shift; - my $descent = shift; - my $specifier = shift; - - # calculate checksum for current object - my $ident = $self->{node}->{$descent}->{ident}; - - # build dump of this node - my $payload = $self->{node}->{$descent}->{payload}; - #my $dump = $ident . "\n" . $item->quickdump(); - #my $dump = $ident . "\n" . Dumper($item); - my $dump = $ident . "\n" . $self->_dumpCompact($payload); - - # TODO: $logger->dump( ... ); - #$logger->debug( __PACKAGE__ . ": " . $dump ); - #$logger->dump( __PACKAGE__ . ": " . $dump ); - - # calculate checksum from dump - # note: the 32-bit integer hash from DBI seems - # to generate duplicates with small payloads already in ranges of hundreds of items/rows!!! - # try to avoid to use it or try to use it only for payloads greater than, hmmm, let's say 30 chars? - # (we had about 15 chars average per item (row)) - - # md5-based fingerprint, base64 encoded (from Digest::MD5) - $self->{node}->{$descent}->{checksum} = md5_base64($dump) . '=='; - # 32-bit integer "hash" value (maybe faster?) (from DBI) - #$self->{node}->{$descent}->{checksum} = DBI::hash($dump, 1); - - # signal good - return 1; - -} - - -sub _readChecksum { - my $self = shift; - - my $descent = shift; - - #print "getcheck:", "\n"; print Dumper($self->{node}->{$descent}); - - if (!$self->{node}->{$descent}) { - # signal checksum bad - return; - } - - # get checksum for current entry - # TODO: don't have the checksum column/property hardcoded as "cs" here, make this configurable somehow - - if ($self->{meta}->{$descent}->{storage}->{isChecksumAuthority}) { - #$self->{node}->{$descent}->{checksum} = $entry->{cs}; - #$self->{node}->{$descent}->{checksum} = $self->_calcChecksum($descent); # $entry->{cs}; - #print "descent: $descent", "\n"; - $self->_calcChecksum($descent); - #print "checksum: ", $self->{node}->{$descent}->{checksum}, "\n"; - } else { - - #$self->{node}->{$descent}->{checksum} = $entry->{cs}; - $self->{node}->{$descent}->{checksum} = $self->{node}->{$descent}->{payload}->{cs}; - } - - # signal checksum good - return 1; - -} - - -sub _buildMap { - - my $self = shift; - - # field-structure for building sql - # mapping of sql-fieldnames to object-attributes - $self->{node}->{map} = {}; - - # manually set ... - # ... object-id - $self->{node}->{map}->{$self->{meta}->{target}->{IdentProvider}->{arg}} = $self->{node}->{source}->{ident}; - # ... checksum - $self->{node}->{map}->{cs} = $self->{node}->{source}->{checksum}; - -#print "sqlmap: ", Dumper($self->{node}->{map}), "\n"; - - # for transferring flat structures via simple (1:1) mapping - # TODO: diff per property / property value - - if ($self->{args}->{mapping}) { - # apply mapping from $self->{args}->{mapping} to $self->{node}->{map} - #foreach my $key (@{$self->{meta}->{source}->{childnodes}}) { - my @childnodes = @{$self->{meta}->{source}->{childnodes}}; - for (my $mapidx = 0; $mapidx <= $#childnodes; $mapidx++) { - #my $map_right = $self->{args}->{mapping}->{$key}; - - $self->{node}->{source}->{propcache} = {}; - $self->{node}->{target}->{propcache} = {}; - - # get property name - $self->{node}->{source}->{propcache}->{property} = $self->{meta}->{source}->{childnodes}->[$mapidx]; - $self->{node}->{target}->{propcache}->{property} = $self->{meta}->{target}->{childnodes}->[$mapidx]; - #print "map: $map_right", "\n"; - - # get property value - my $value; - - # detect for callback - old style - (maybe the better???) - if (ref($self->{node}->{target}->{map}) eq 'CODE') { - #$value = &$map_right($objClone); - } else { - # plain (scalar?) value - #$value = $objClone->{$map_right}; - $self->{node}->{source}->{propcache}->{value} = $self->{node}->{source}->{payload}->{$self->{node}->{source}->{propcache}->{property}}; - } - #$self->{node}->{map}->{$key} = $value; - - # detect expression - # for transferring deeply nested structures described by expressions - #print "val: $self->{node}->{source}->{propcache}->{value}", "\n"; - if ($self->{node}->{source}->{propcache}->{property} =~ s/^expr://) { - - # create an anonymous sub to act as callback target dispatcher - my $cb_dispatcher = sub { - #print "=============== CALLBACK DISPATCHER", "\n"; - #print "ident: ", $self->{node}->{source}->{ident}, "\n"; - #return $self->{node}->{source}->{ident}; - - }; - - -#print Dumper($self->{node}); - - # build callback map for helper function - #my $cbmap = { $self->{meta}->{source}->{IdentProvider}->{arg} => $cb_dispatcher }; - my $cbmap = {}; - my $value = refexpr2perlref($self->{node}->{source}->{payload}, $self->{node}->{source}->{propcache}->{property}, $cbmap); - $self->{node}->{source}->{propcache}->{value} = $value; - } - - # encode values dependent on type of underlying storage here - expand cases... - my $storage_type = $self->{meta}->{target}->{storage}->{locator}->{type}; - if ($storage_type eq 'DBI') { - # ...for sql - $self->{node}->{source}->{propcache}->{value} = quotesql($self->{node}->{source}->{propcache}->{value}); - } - elsif ($storage_type eq 'Tangram') { - # iso? utf8 already possible? - - } elsif ($storage_type eq 'LDAP') { - # TODO: encode utf8 here? - } - - # store value to transfer map - $self->{node}->{map}->{$self->{node}->{target}->{propcache}->{property}} = $self->{node}->{source}->{propcache}->{value}; - - } - } - - - # TODO: $logger->dump( ... ); - #$logger->debug( "sqlmap:" . "\n" . Dumper($self->{node}->{map}) ); -#print "sqlmap: ", Dumper($self->{node}->{map}), "\n"; -#print "entrystatus: ", Dumper($self->{node}), "\n"; - -} - -sub _resolveNodeIdent { - my $self = shift; - my $descent = shift; - - #print Dumper($self->{node}->{$descent}); - - # get to the payload - #my $item = $specifier->{item}; - my $payload = $self->{node}->{$descent}->{payload}; - - # resolve method to get to the id of the given item - # we use global metadata and the given descent for this task - #my $ident = $self->{$descent}->id($item); - #my $ident = $self->{meta}->{$descent}->{storage}->id($item); - - my $ident; - my $provider_method = $self->{meta}->{$descent}->{IdentProvider}->{method}; - my $provider_arg = $self->{meta}->{$descent}->{IdentProvider}->{arg}; - - # resolve to ident - if ($provider_method eq 'property') { - $ident = $payload->{$provider_arg}; - - } elsif ($provider_method eq 'storage_method') { - #$ident = $self->{meta}->{$descent}->{storage}->id($item); - $ident = $self->{meta}->{$descent}->{storage}->$provider_arg($payload); - } - - $self->{node}->{$descent}->{ident} = $ident; - - return 1 if $ident; - -} - - -sub _modifyNode { +sub _doTransferToTarget { my $self = shift; - my $descent = shift; my $action = shift; - my $map = shift; - my $crit = shift; - - # map for new style callbacks - my $map_callbacks = {}; - - # checks go first! - - # TODO: this should be reviewed first - before extending ;-) - # TODO: this should be extended: - # count this cases inside the caller to this sub and provide a better overall message - # if this counts still zero in the end: - # "No nodes have been touched for modify: Do you have column-headers in your csv file?" - if (not defined $self->{node}) { - #$logger->critical( __PACKAGE__ . "->_modifyNode failed: \"$descent\" node is empty." ); - #return; - } - - # transfer callback nodes from value map to callback map - handle them afterwards! - (new style callbacks) - if (my $callbacks = $self->{meta}->{$descent}->{Callback}) { - foreach my $callback (keys %{$callbacks->{write}}) { - $map_callbacks->{write}->{$callback} = $map->{$callback}; - delete $map->{$callback}; - } - } - - - #print Dumper($self->{meta}); - - # DBI speaks SQL - if ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'DBI') { - -#print Dumper($self->{node}); - my $sql_main; - # translate map to sql - #print $action, "\n"; exit; - #print $self->{meta}->{$descent}->{node}, "\n"; exit; - #print "action:"; - #print $action, "\n"; -#$action = "anc"; -#print "yai", "\n"; - -#print Dumper($map); -#delete $map->{cs}; - - if (lc($action) eq 'insert') { - $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_INSERT'); - } elsif (lc $action eq 'update') { - $crit ||= "$self->{meta}->{$descent}->{IdentProvider}->{arg}='$self->{node}->{$descent}->{ident}'"; - $sql_main = hash2Sql($self->{meta}->{$descent}->{node}, $map, 'SQL_UPDATE', $crit); - } - -#$sql_main = "UPDATE currencies_csv SET oid='abcdef' WHERE text='Australian Dollar' AND key='AUD';"; -#$sql_main = "UPDATE currencies_csv SET oid='huhu2' WHERE ekey='AUD'"; - -#print "sql: ", $sql_main, "\n"; -#exit; - - # transfer data - my $sqlHandle = $self->{meta}->{$descent}->{storage}->sendCommand($sql_main); - -#exit; - - # handle errors - if ($sqlHandle->err) { - #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; } - $self->{node}->{status}->{error} = { - statement => $sql_main, - state => $sqlHandle->state, - err => $sqlHandle->err, - errstr => $sqlHandle->errstr, - }; - } else { - $self->{node}->{status}->{ok} = 1; - } - - # Tangram does it the oo-way (naturally) - } elsif ($self->{meta}->{$descent}->{storage}->{locator}->{type} eq 'Tangram') { - my $sql_main; - my $object; - - # determine classname - my $classname = $self->{meta}->{$descent}->{node}; - - # properties to exclude - my @exclude = @{$self->{meta}->{$descent}->{subnodes_exclude}}; - - - if (my $identProvider = $self->{meta}->{$descent}->{IdentProvider}) { - push @exclude, $identProvider->{arg}; - } - - # new feature: - # - check TypeProvider metadata property from other side - # - use argument (arg) inside as a classname for object creation on this side - #my $otherSide = $self->_otherSide($descent); - if (my $typeProvider = $self->{meta}->{$descent}->{TypeProvider}) { - #print Dumper($map); - $classname = $map->{$typeProvider->{arg}}; - # remove nodes from map also (push nodes to "subnodes_exclude" list) - push @exclude, $typeProvider->{arg}; - } - - # exclude banned properties (remove from map) - #map { delete $self->{node}->{map}->{$_} } @{$self->{args}->{exclude}}; - map { delete $map->{$_} } @exclude; - - # list of properties - my @props = keys %{$map}; - - # transfer data - if (lc $action eq 'insert') { - - # build array to initialize object - #my @initarray = (); - #map { push @initarray, $_, undef; } @props; - - # make the object persistent in four steps: - # - raw create (perl / class tangram scope) - # - engine insert (tangram scope) ... this establishes inheritance - don't try to fill in inherited properties before! - # - raw fill-in from hash (perl scope) - # - engine update (tangram scope) ... this updates all properties just filled in - - # create new object ... - #my $object = $classname->new( @initarray ); - $object = $classname->new(); - - # ... pass to orm ... - $self->{meta}->{$descent}->{storage}->insert($object); - - # ... and initialize with empty (undef'd) properties. - #print Dumper(@props); - map { $object->{$_} = undef; } @props; - - # mix in values ... - hash2object($object, $map); - - # ... and re-update@orm. -#print Dumper($object); - $self->{meta}->{$descent}->{storage}->update($object); - - # asymmetry: get ident after insert - # TODO: - # - just do this if it is an IdentAuthority - # - use IdentProvider metadata here -#print Dumper($self->{meta}->{$descent}); - my $oid = $self->{meta}->{$descent}->{storage}->id($object); -#print "oid: $oid", "\n"; - $self->{node}->{$descent}->{ident} = $oid; - - - } elsif (lc $action eq 'update') { - - # get fresh object from orm first - $object = $self->{meta}->{$descent}->{storage}->load($self->{node}->{$descent}->{ident}); - -#print Dumper($self->{node}); - - # mix in values - #print Dumper($object); - hash2object($object, $map); - #print Dumper($object); - #exit; - $self->{meta}->{$descent}->{storage}->update($object); - } - - my $error = 0; - - # handle new style callbacks - this is a HACK - do this without an eval! - #print Dumper($map); - #print "cb: ", Dumper($self->{meta}->{$descent}->{Callback}); - #print Dumper($map_callbacks); - foreach my $node (keys %{$map_callbacks->{write}}) { - #print Dumper($node); - my $perl_callback = $self->{meta}->{$descent}->{node} . '::' . $node . '_write'; - my $evalstring = $perl_callback . '( { object => $object, value => $map_callbacks->{write}->{$node}, storage => $self->{meta}->{$descent}->{storage} } );'; - #print $evalstring, "\n"; exit; - eval($evalstring); - if ($@) { - $error = 1; - print $@, "\n"; - } - - #print "after eval", "\n"; - - if (!$error) { - # re-update@orm - $self->{meta}->{$descent}->{storage}->update($object); - } - } - - # handle errors - if ($error) { - #print "error", "\n"; -=pod - my $sqlHandle; - #if ($self->{args}->{debug}) { print "sql-error with statement: $sql_main", "\n"; } - $self->{node}->{status}->{error} = { - statement => $sql_main, - state => $sqlHandle->state, - err => $sqlHandle->err, - errstr => $sqlHandle->errstr, - }; -=cut - # rollback.... - #print "rollback", "\n"; - $self->{meta}->{$descent}->{storage}->erase($object); - #print "after rollback", "\n"; - } else { - $self->{node}->{status}->{ok} = 1; - } - - - } - -} - -# TODO: -# this should be split up into... -# - a "_statNode" (should just touch the node to check for existance) -# - a "_loadNode" (should load node completely) -# - maybe additionally a "loadNodeProperty" (may specify properties to load) -# - introduce $self->{nodecache} for this purpose -# TODO: -# should we: -# - not pass ident in here but resolve it via "$descent"? -# - refactor this and stuff it with additional debug/error message -# - this = the way the implicit load mechanism works -sub _statloadNode { - - my $self = shift; - my $descent = shift; - my $ident = shift; - my $force = shift; - - # fetch entry to retrieve checksum from - # was: - if (!$self->{node}->{$descent} || $force) { - # is: - #if (!$self->{node}->{$descent}->{item} || $force) { - - if (!$ident) { - #print "\n", "Attempt to fetch entry implicitely by ident failed: no ident given! This may result in an insert if no write-protection is in the way.", "\n"; - return; - } - - # patch for DBD::CSV - if ($ident && $ident eq 'Null') { - return; - } - -#print "yai!", "\n"; - - my $query = { - node => $self->{meta}->{$descent}->{node}, - subnodes => [qw( cs )], - criterias => [ - { key => $self->{meta}->{$descent}->{IdentProvider}->{arg}, - op => 'eq', - val => $ident }, - ] - }; - -#print Dumper($query); - - my $result = $self->{meta}->{$descent}->{storage}->sendQuery($query); - - my $entry = $result->getNextEntry(); - -#print Dumper($entry); -#print "pers: " . $self->{meta}->{$descent}->{storage}->is_persistent($entry), "\n"; -#my $state = $self->{meta}->{$descent}->{storage}->_fetch_object_state($entry, { name => 'TransactionHop' } ); -#print Dumper($state); - - my $status = $result->getStatus(); - -#print Dumper($status); - # TODO: enhance error handling (store inside tc) - #if (!$row) { - # print "\n", "row error", "\n"; - # next; - #} - - # these checks run before actually loading payload- and meta-data to node-container + # trace + #print Dumper($self->{meta}); + #print Dumper($self->{node}); + #exit; - # 1st level - hard error - if ($status && $status->{err}) { - $logger->debug( __PACKAGE__ . "->_statloadNode (ident=\"$ident\") failed - hard error (that's ok): $status->{err}" ); - return; - } - - # 2nd level - logical (empty/notfound) error - if (($status && $status->{empty}) || !$entry) { - $logger->debug( __PACKAGE__ . "->_statloadNode (ident=\"$ident\") failed - logical error (that's ok)" ); - #print "no entry (logical)", "\n"; - return; - } - -#print Dumper($entry); - - # was: - # $self->{node}->{$descent}->{ident} = $ident; - # is: - # TODO: re-resolve ident from entry via metadata "IdentProvider" here - like elsewhere - $self->{node}->{$descent}->{ident} = $ident; - $self->{node}->{$descent}->{payload} = $entry; - - } - - return 1; - -} - -sub _doTransferToTarget { - my $self = shift; - my $action = shift; $self->_modifyNode('target', $action, $self->{node}->{map}); } + sub _doModifySource_IdentChecksum { my $self = shift; my $ident_new = shift; @@ -1093,17 +520,15 @@ $self->_modifyNode('source', 'update', $map); } - -# this is a shortcut method -# ... let's try to avoid _any_ redundant code in here (ok... - at the cost of method lookups...) -sub _getNodeList { +sub _doModify_LocalChecksum { my $self = shift; my $descent = shift; - my $filter = shift; - return $self->{meta}->{$descent}->{storage}->getListFiltered($self->{meta}->{$descent}->{node}, $filter); + my $map = { + cs_local => $self->{node}->{$descent}->{checksum_local_calculated}, + }; + $self->_modifyNode($descent, 'update', $map); } - sub _prepareNode_MetaProperties { my $self = shift; my $descent = shift; @@ -1138,10 +563,12 @@ } +# TODO: load column-metadata from reversed mapping-metadata sub _prepareNode_DummyIdent { my $self = shift; my $descent = shift; + #print Dumper($self->{options}); $logger->info( __PACKAGE__ . "->_prepareNode_DummyIdent( descent $descent )" ); my $list = $self->_getNodeList($descent); @@ -1155,6 +582,7 @@ my $map = { $self->{meta}->{$descent}->{IdentProvider}->{arg} => $ident_dummy, cs => undef, + cs_local => undef, }; # diff lists and ... @@ -1196,153 +624,6 @@ return ''; } -sub _erase_all { - my $self = shift; - my $descent = shift; - #my $node = shift; - my $node = $self->{meta}->{$descent}->{node}; - $self->{meta}->{$descent}->{storage}->eraseAll($node); -} - - -=pod - - -=head1 DESCRIPTION - -Data::Transfer::Sync is a module providing a generic synchronization process -across arbitrary/multiple storages based on a ident/checksum mechanism. -It sits on top of Data::Storage. - - -=head1 REQUIREMENTS - - For full functionality: - Data::Storage - Data::Transform - Data::Compare - ... and all their dependencies - - -=head1 AUTHORS / COPYRIGHT - -The Data::Storage module is Copyright (c) 2002 Andreas Motl. -All rights reserved. - -You may distribute it under the terms of either the GNU General Public -License or the Artistic License, as specified in the Perl README file. - - -=head1 SUPPORT / WARRANTY - -Data::Storage is free software. IT COMES WITHOUT WARRANTY OF ANY KIND. - - - -=head1 BUGS - - When in "import" mode for windows file - DBD::AutoCSV may hang. - Hint: Maybe the source node contains an ident-, but no checksum-column? - - -=head1 USER LEVEL ERRORS - -=head4 Mapping - - - - - - - - - - - - - - - - - - - - - - - - - - - - - info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 )critical: BizWorks::Process::Setup->startSync: Can't access mapping for node "Currency" - please check BizWorks::ResourceMapping. - - - - - - - - - - - - - - - - - - - - - - - - - - - - You have to create a sub for each node used in synchronization inside named Perl module. The name of this sub _must_ match - the name of the node you want to sync. This sub holds mapping metadata to give the engine hints about how - to access the otherwise generic nodes. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -=head4 DBD::AutoCSV's rulebase - - - - - - - - - - - - - - - - - - - - - - - - - - - - - info: BizWorks::Process::Setup->syncResource( source_node Currency mode PULL erase 0 import 0 ) - info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv - - Execution ERROR: Error while scanning: Missing first row or scanrule not applied. at C:/home/amo/develop/netfrag.org/nfo/perl/libs/DBD/CSV.p - m line 165, line 1. - called from C:/home/amo/develop/netfrag.org/nfo/perl/libs/Data/Storage/Handler/DBI.pm at 123. - - DBI-Error: DBD::AutoCSV::st fetchrow_hashref failed: Attempt to fetch row from a Non-SELECT statement - notice: Data::Transfer::Sync->syncNodes: No nodes to synchronize. - - - - - - - - - - - - - - - - - - - - - - - - - - - - DBD::AutoCSV contains a rulebase which is spooled down while attempting to guess the style of the csv file regarding - parameters like newline (eol), column-seperation-character (sep_char), quoting character (quote_char). - If this spool runs out of entries and no style could be resolved, DBD::CSV dies causing this "Execution ERROR" which - results in a "DBI-Error" afterwards. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -=head4 Check structure of source node - - - - - - - - - - - - - - - - - - - - - - - - - - - - - info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv - critical: Data::Transfer::Sync->syncNodes: Can not synchronize: No ident found in source node, maybe try to "import" this node first. - - - - - - - - - - - - - - - - - - - - - - - - - - - - If lowlevel detection succeeds, but no other required informations are found, this message is issued. - "Other informations" might be: - - column-header-row completely missing - - ident column is empty - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -=head4 Modify structure of source node - - - - - - - - - - - - - - - - - - - - - - - - - - - - - info: Data::Transfer::Sync->syncNodes: source=L/Currency <- target=R/currencies.csv - info: Data::Transfer::Sync->_prepareNode_MetaProperties( descent source ) - warning: Data::Transfer::Sync->_prepareNode_MetaProperties: node is lacking meta properties - will try to alter... - SQL ERROR: Command 'ALTER' not recognized or not supported! - - SQL ERROR: Command 'ALTER' not recognized or not supported! - - - - - - - - - - - - - - - - - - - - - - - - - - - - The Engine found a node which structure does not match the required. It tries to alter this automatically - only when doing "import" - - but the DBD driver (in this case DBD::CSV) gets in the way croaking not to be able to do this. - This could also appear if your database connection has insufficient rights to modify the database structure. - DBD::CSV croaks because it doesn't implement the ALTER command, so please edit your columns manually. - Hint: Add columns with the names of your "ident" and "checksum" property specifications. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -=head4 Load source node by ident - - - - - - - - - - - - - - - - - - - - - - - - - - - - - info: Data::Transfer::Sync->_prepareNode_DummyIdent( descent source ) - pcritical: Data::Transfer::Sync->_modifyNode failed: "source" node is empty. - - - - - - - - - - - - - - - - - - - - - - - - - - - - The source node could not be loaded. Maybe the ident is missing. Please check manually. - Hint: Like above, the ident and/or checksum columns may be missing.... - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -=head1 TODO - - - sub _resolveIdentProvider - - wrap _doModifySource and _doTransferTarget around a core function which can change virtually any type of node - - split this module up into Sync.pm, Sync/Core.pm, Sync/Compare.pm and Sync/Compare/Checksum.pm - - introduce _compareNodes as a core method and wrap it around methods in Sync/Compare/Checksum.pm - - introduce Sync/Compare/MyComparisonImplementation.pm - - some generic deferring method - e.g. "$self->defer(action)" - to be able to accumulate a bunch of actions for later processing - - this implies everything done is _really_ split up into generic actions - how else would we defer them??? - - example uses: - - fetch whole checksum list from node - - remember source ident retransmits - - remember: this is convenient - and maybe / of course faster - but we'll loose "per-node-atomic" operations - - feature: mechanism to implicit inject checksum property to nodes (alter table / modify schema) - - expand statistics / keep track of: - - touched/untouched nodes - - full sync - - just do a push and a pull for now but use stats for touched nodes in between to speed up things - - introduce some new metadata flags for a synchronization partner which is (e.g.) of "source" or "target": - - isNewNodePropagator - - isWriteProtected - - -=cut 1; +__END__