#!/usr/bin/env perl # # dbinstkeys - guess primary and foreign keys in a database instance # # $Id$ # note: on MDB, we get the error message # DBD::ODBC::st execute failed: [Microsoft][ODBC Microsoft Access Driver] The field is too small to accept the amount of data you attempted to add. Try inserting or pasting less data. (SQL-22001) at /cygdrive/f/scripts/squimp/dbinstkeys line 520. # which according to # http://support.microsoft.com/kb/896950 # is due to the presence of MEMO fields in a SELECT DISTINCT clause # (the engine truncates the values before comparing and this is apparently # its clumsy way of letting us know) use warnings; use strict; use Getopt::Std; use File::Spec::Functions 'rel2abs'; use DBI; use Text::CSV; my %opt; getopts( 'haAcC:D:f:k:p:sS:t:T:u:vx03', \%opt ); my $print_nonkeys = $opt{a} // $opt{A} // 0; my $link_to_nonkeys = $opt{A} // 0; my $add_column_counts = $opt{c}; my $driver = $opt{D}; my $catalogs = $opt{C}; my $min_pk_count = $opt{k} // 2; my $min_fk_count = $opt{f} // 2; my $ign_nulls = $opt{0}; my $intersect = $opt{x}; my $add_primkeys = $opt{s}; my $schemas = $opt{S} // '%'; my $tables = $opt{T} // '%'; my $outsep = $opt{t} // ','; my $verbose = $opt{v} // 0; my $add_count_column = $opt{3}; my $rx_schemas = rx_for_likearg($schemas); my $rx_tables = rx_for_likearg($tables); my @drivers = DBI->available_drivers(); push( @drivers, 'MDB' ) if grep { $_ eq 'ODBC' } @drivers; # guessing $opt{h} and HELP_MESSAGE(); defined($driver) or die "missing -D option, value must be one of ", join( ' ', @drivers ), "\n see -h for help\n"; grep { $driver eq $_ } (@drivers) or die "invalid value for -D option, must be one of ", join( ' ', @drivers ), "\n see -h for help\n"; !$min_pk_count && $link_to_nonkeys and die "conflicting options: -k 0 and -A\n see -h for help\n"; !$min_fk_count && $link_to_nonkeys and die "conflicting options: -f 0 and -A\n see -h for help\n"; my $csvw = new Text::CSV_XS( { sep_char => $outsep, eol => $/, quote_space => 0, ( defined $opt{q} ? ( quote_char => $opt{q} ) : () ) } ) or die "fatal error: the CSV parser doesn't work (not installed?):\n$!\n"; sub print_row { $csvw->print( *STDOUT, \@_ ); } sub ask # slightly modified from http://www.perlmonks.org/?node_id=886306 { use Term::ReadKey; my ( $prompt, $repl ) = @_; my $pword; my $key; open( my $out_fh, '>', '/dev/tty' ) or die "Cannot write to tty: $!\n"; my $stdout = *STDOUT; select($out_fh); local $| = 1; # Turn off STDOUT buffering for immediate response print "$prompt: "; ReadMode 2; # Change to Raw Mode, disable Ctrl-C while (1) { while ( !defined( $key = ReadKey(-1) ) ) { } if ( $key =~ /^[\r\n]/ ) { # if Enter was pressed... print $key; # print a newline last; # and get out of here } print( $repl // $key ); $pword .= $key; } ReadMode 0; # Reset tty mode before exiting. <==IMPORTANT select($stdout); return $pword; } my $user = $opt{u} // ''; my $password = $opt{p} // ask( 'Password', '*' ); my $dsn_prefix = ''; if ( $driver eq 'MDB' ) { # not really a driver, mostly a shorthand $driver = 'ODBC'; $dsn_prefix = 'Driver=Microsoft Access Driver (*.mdb);Dbq='; } @ARGV or die "you must supply an argument, try -h for help\n"; foreach my $dsn (@ARGV) { process_dsn($dsn); } exit(0); #--- auxiliaries ---# # sub HELP_MESSAGE { my $driver_names = join( ', ', sort @drivers ); print <: part. Options: -u username The user to connect as. -p password The password for the user. If this option is missing, the password is asked interactively. -D driver Which DBI driver to use; SQLite is the default. Must be one of $driver_names. Please note that only SQLite, CSV and MDB have been tested and only once, and that CSV does not correctly report primary keys. -C catalogs Which catalogs to include; an SQL LIKE argument. By default, an undefined value is used. -S schemas Which schemas to include; an SQL LIKE argument. By default, % (which matches everything) is used. Some drivers such as CSV do not really support schemas. -T tables Which tables to include; an SQL LIKE expression. By default, % (which matches everything) is used. -0 Ignore all NULL values when determining foreign keys. Without this option, columns with NULL values will not be considered. -k nr The minimum number or rows we require in a column before it will be considered as a possible candidate key. Defaults to 2. If 0, the meaning changes: only explicitly marked primary keys consisting of a single column will be used. -f nr The minimum number or rows we require in a column before it will be considered as a possible foreign key. Defaults to 2. If 0, the meaning changes: only explicitly marked foreign keys consisting of single columns will be used. -x Intersect. Instead of requiring that the 'foreign key' values are a subset of the 'primary key' values, only require that their intersection isn't empty. -t sep The separator to use in the output CSV file. Defaults to a comma. -c Suffix the name of each column name with its number of different values. -3 Add a third column with the number of different values in both columns. This creates an unheadered strictly 3-column CSV table in which possible primary keys are represented with self-loops. -a Output all columns, not just candidate keys. Note that without this option, any tables without single-column candidate keys will not be present in the output at all. Note that -f and k options still take effect to filter out columns. -A Output inclusion dependencies to all columns, not just to candidate keys. Implies -a. -s Separately output a row for each candidate key found. In default format, it just contains the column; with -3, it contains the same column twice. -v Issue some progress messages; useful for debugging. ZZ exit(0); } sub warn_v { warn @_, "\n" if $verbose; } sub rx_for_likearg { my @rx = split( /([_%])/, $_[0], -1 ); my $rx = '^' . join( '', map { $_ eq '_' ? '.' : $_ eq '%' ? '.*' : quotemeta($_) } @rx ) . '$'; qr($rx); } sub process_dsn { my ($dsn) = @_; if ( -f $dsn ) { $dsn = cyg2win( rel2abs($dsn) ); # uses forward slashes } if ( my $dbh = DBI->connect( "dbi:$driver:$dsn_prefix$dsn", $user, $password, { RaiseError => 1, LongTruncOk => 1 } ) ) { warn_v "connected to database dbi:$driver:$dsn_prefix$dsn"; if ( $driver eq 'CSV' ) { warn_v "warning: CSV does not correctly find candidate primary keys"; # provide sane table names my $ti_sth = $dbh->table_info( $catalogs, $schemas, $tables ); # the arguments are ignored for now, but OK while ( my $ti = $ti_sth->fetchrow_arrayref() ) { my $tab = $ti->[2]; $dbh->{csv_tables}{ sanitized_name($tab) }->{file} = $tab; } $ti_sth->finish(); } process_dbh($dbh); $dbh->disconnect; } else { warn_v "cannot open the $driver database $dsn: $DBI::errstr"; } } sub cyg2win # convert paths starting with /cygdrive to Win32 equivalents # this implementation is experimental; avoiding chomp($_=`cygpath -w $_`) { local ($_) = $_[0]; s#^/cygdrive/(\w+)#\U$1:#; $_; } sub sanitized_name { my $c = $_[0]; $c =~ s/\W/_/g; $c; } sub column_info_hashrefs2array # converts a list of hashrefs to a list of arrayrefs { map { [ $_->{TABLE_CAT}, $_->{TABLE_SCHEM}, $_->{TABLE_NAME}, $_->{COLUMN_NAME}, $_->{TYPE_NAME} ] } @_; } sub column_info # returns an array of references to arrays ( $cat, $sch, $tab, $col ) { my ($dbh) = @_; die if defined($catalogs) && $catalogs eq ''; if ( $driver eq 'MDB' ) # should be a full test for Jet connections { # $dbh->column_info will fail with an error message; avoid it } elsif ( my $ci_sth = $dbh->column_info( $catalogs, $schemas, $tables, '%' ) ) { # the expected case my $ci = $ci_sth->fetchall_arrayref( {} ); # {} makes members hash refs $ci_sth->finish(); warn_v "column_info has returned ", scalar(@$ci), " columns"; return column_info_hashrefs2array(@$ci) if (@$ci) # SQL Server returns 0 here :-( } warn_v "column_info didn't work!"; # this happens for DBD::CSV, DBD::ODBC to Jet, MySQL, probably others # the following workaround works for DBD::CSV and DBD::ODBC with #my $ti_sth = $dbh->table_info( $catalogs, $schemas, $tables, 'TABLE' ); # works for neither - crashes for DBD::ODBC, in fact my $ti_sth = $dbh->table_info( undef, undef, undef, 'TABLE' ); my @ci; foreach my $r ( column_info_hashrefs2array( @{ $ti_sth->fetchall_arrayref( {} ) } ) ) { my ( $cat, $sch, $tab, $type ) = @$r; # apply -s and -t filters because table_info won't do it my $q_tablename = $dbh->quote_identifier( $cat, $sch, $tab ); if ( !table_passes( $sch, $tab ) ) { warn_v "skipping table $q_tablename, mismatches schemas rx $rx_schemas or tables $rx_tables"; next; } warn_v "including table $q_tablename"; # DBD::CSV doesn't support the inclusion of schema names in queries # return a $sch of undef to mark this fact #my $qh = $dbh->prepare( sprintf( 'SELECT * FROM "%s" LIMIT 0', $tab ) ); #Jet doesn't accept LIMIT, apparently my $q = sprintf( 'SELECT * FROM %s WHERE 0=1', $q_tablename ); warn_v "preparing: $q"; my $qh = $dbh->prepare($q); $qh->execute(); foreach my $colname ( @{ $qh->{NAME} } ) { push( @ci, [ $cat, $sch, $tab, $colname ] ); } $qh->finish(); } $ti_sth->finish(); @ci; } sub table_passes { my ( $sch, $tab, $q_name ) = @_; if ( defined($sch) && $sch !~ /$rx_schemas/ ) { return 0; } if ( defined($tab) && $tab !~ /$rx_tables/ ) { return 0; } 1; } sub process_dbh { my ($dbh) = @_; my @ci = column_info($dbh); # assume that @ci lists tables and columns in order # create a data structure for holding the information my %cat2sch2tab2col2info; foreach my $ci (@ci) { #print_row(fully_quoted_column($dbh,@$ci)); # a 1-element row # would print *all* columns, and won't support -c my ( $cat, $sch, $tab, $col ) = @$ci; my $sql = select_distinct_count( $dbh, $col, $cat, $sch, $tab ); # the next DBD::CSV problem: DISTINCT takes no effect here # I'm giving up on CSV for now! # I have scripts to translate it to SQLite anyway! my $q_col = fully_quoted_column( $dbh, @$ci ); my $nr_col_vals = eval { query_to_scalar( $dbh, $sql ) } // do { warn_v "cannot count $q_col"; next; }; # e.g. the IMAGE column type in SQL Server will cause this warn_v "$q_col has $nr_col_vals distinct values"; next if ( $nr_col_vals < $min_fk_count || $nr_col_vals < $min_pk_count ); my @ci2use = map { $_ // '' } @$ci; warn_v "column info: ", join( ', ', @ci2use ); $cat2sch2tab2col2info{ $ci2use[0] }->{ $ci2use[1] }->{ $ci2use[2] } ->{ $ci2use[3] }->{count} = $nr_col_vals; } while ( my ( $cat, $sch2tab2col2info ) = each %cat2sch2tab2col2info ) { $cat = undef if $cat eq ''; # quote_identifier doesn't accept '' while ( my ( $sch, $tab2col2info ) = each %$sch2tab2col2info ) { $sch = undef if $sch eq ''; # see above # determine single-column candidate link destinations for each table while ( my ( $tab, $col2info ) = each %$tab2col2info ) { my $q_tablename = $dbh->quote_identifier( $cat, $sch, $tab ); my $nr_rows = query_to_scalar( $dbh, select_count( $dbh, '*', $cat, $sch, $tab ) ) or next; #warn "$q_tablename has $nr_rows rows\n"; # mark potential reported destinations if ( !$min_pk_count ) { # only used explicit single-column primary keys my @primary_key = $dbh->primary_key( $cat, $sch, $tab ); if ( @primary_key == 1 ) { $col2info->{$_}->{is_pot_dest} = 1 for @primary_key; } elsif ( !@primary_key ) { warn_v "no explicit primary key, omitted: $q_tablename"; } else { warn_v "multicolumn primary key, omitted: $q_tablename"; } } else { # all columns with at least -k values and no duplicates unless -A foreach my $col ( keys %$col2info ) { my $nr_col_vals = $col2info->{$col}->{count}; if ( $nr_col_vals >= $min_pk_count && ( $link_to_nonkeys || $nr_col_vals == $nr_rows ) ) { $col2info->{$col}->{is_pot_dest} = 1; } } } if ($add_primkeys) { # report (candidate) primary keys and/or all columns foreach my $col ( keys %$col2info ) { next if ( !$print_nonkeys && !$col2info->{$col}->{is_pot_dest} ); my $fqc = fully_quoted_column( $dbh, $cat, $sch, $tab, $col ); my $nr_col_vals = $col2info->{$col}->{count}; if ($add_column_counts) { $fqc = "$fqc:$nr_col_vals"; } if ($add_count_column) { print_row( $fqc, $fqc, $nr_col_vals ) # 3-element row } else { print_row($fqc) # 1-element row } } } } if ( !$min_fk_count ) { # only report explicit foreign keys! # see http://search.cpan.org/dist/DBI/DBI.pm#foreign_key_info my $sth = $dbh->foreign_key_info( $cat, $sch, undef, $cat, $sch, undef ) or die "no support in this database for fetching foreign key info\n"; while ( my $row = $sth->fetchrow_arrayref() ) { my ( $pcat, $psch, $ptab, $pcol, $fcat, $fsch, $ftab, $fcol ) = @$row or next; if ( !eq2( $psch, $sch ) || !eq2( $fsch, $sch ) ) { next # note that we don't really support catalogues, # but we do need to support schemas when defined (e.g. for MySQL) } report_colpair( $tab2col2info, $dbh, $cat, $sch, $ftab, $fcol, $ptab, $pcol ); } next; } # determine single-to-single-column inclusions (subset relationships) # (of, if -x, overlaps) for each pair of tables my @tables = keys %$tab2col2info; foreach my $tab1 (@tables) { my $col1info = $tab2col2info->{$tab1}; my @keycol1s = grep { $col1info->{$_}->{is_pot_dest} } keys %$col1info or next; foreach my $tab2 (@tables) { my $col2info = $tab2col2info->{$tab2}; foreach my $col2 ( keys %$col2info ) { my $nr_rows2 = $col2info->{$col2}->{count} // next; # the count may be undefined, see above $nr_rows2 >= $min_fk_count or next; # we can't do this test sooner because nonkeys may be included foreach my $col1 (@keycol1s) { next if $col1 eq $col2 && $tab1 eq $tab2; # output $col2 -> $col1 iff $col2 \subseteq $col1 next if $nr_rows2 > $col1info->{$col1}->{count}; # optimization # output $col2 -> $col1 iff $col2 \subseteq $col1 # whew ... now we actually have to find out # whether values($col1) \subseteq values($col2) # or (with -x) values($col1) \cap values($col2) = \emptyset # compute the sets only once: cache them in ...->{value21} if ( !defined $col1info->{$col1}->{value21} ) { my @cv = column_values( $dbh, $cat, $sch, $tab1, $col1 ); #warn_v "column values: ", join(', ', map { $_ // '(NULL)' }, @cv); $col1info->{$col1}->{has_null} = scalar( grep { !defined $_ } @cv ); $col1info->{$col1}->{value21} = { map { $_ => 1 } grep { defined $_ } @cv }; warn_v "found ", scalar( keys %{ $col1info->{$col1}->{value21} } ), " values for $tab1.$col1"; } if ( !defined $col2info->{$col2}->{value21} ) { my @cv = column_values( $dbh, $cat, $sch, $tab2, $col2 ); #warn_v "column values: ", join(', ', map { $_ // '(NULL)' }, @cv); $col2info->{$col2}->{has_null} = scalar( grep { !defined $_ } @cv ); $col2info->{$col2}->{value21} = { map { $_ => 1 } grep { defined $_ } @cv }; warn_v "found ", scalar( keys %{ $col2info->{$col2}->{value21} } ), " values for $tab2.$col2"; } # the following tests are performed too often, will do for now next if !( $ign_nulls || ( !$col1info->{$col1}->{has_null} && !$col2info->{$col2}->{has_null} ) ); if ($intersect) { # require values($col1) \cap values($col2) = \emptyset warn_v "determining nr keys in both $col1 and $col2"; if ( ( my $nr_in_both = nr_keys_in_both( $col1info->{$col1}->{value21}, $col2info->{$col2}->{value21} ) ) > 0 ) { #warn_v "nr keys in both $col1 and $col2: $nr_in_both"; report_colpair( $tab2col2info, $dbh, $cat, $sch, $tab1, $col1, $tab2, $col2, $nr_in_both ); } #else { warn_v "no keys in both $col1 and $col2"; } } else { # require values($col1) \subseteq values($col2) # which could be done with nr_keys_in_both(), but this is faster if ( keys_include_keys( $col1info->{$col1}->{value21}, $col2info->{$col2}->{value21} ) ) { report_colpair( $tab2col2info, $dbh, $cat, $sch, $tab1, $col1, $tab2, $col2 ); } } #else { warn "$tab2.$col2 -/-> $tab1.$col1\n"; } } } } } } } \%cat2sch2tab2col2info; } sub eq2 # eq without warnings on undef { my ( $a, $b ) = @_; !defined($a) ? !defined($b) : ( defined($b) && $a eq $b ); } sub report_colpair { my ( $tab2col2info, $dbh, $cat, $sch, $tab1, $col1, $tab2, $col2, $nr_in_both ) = @_; my $col1info = $tab2col2info->{$tab1}; my $col2info = $tab2col2info->{$tab2}; my $c1 = $col1info->{$col1}->{count} // 0; my $c2 = $col2info->{$col2}->{count} // 0; if ($intersect) { warn_v "$tab2.$col2 -($c2)-N($nr_in_both)-($c1)-> $tab1.$col1"; } else { warn_v "$tab2.$col2 -($c2)-FK-($c1)-> $tab1.$col1"; $nr_in_both = $c1; } push( @{ $col2info->{$col2}->{fkey_for} }, { table => $tab1, column => $col1 } ); my $fqc1 = fully_quoted_column( $dbh, $cat, $sch, $tab1, $col1 ); my $fqc2 = fully_quoted_column( $dbh, $cat, $sch, $tab2, $col2 ); if ($add_column_counts) { $fqc1 = "$fqc1:$c1"; $fqc2 = "$fqc2:$c2"; } if ($add_count_column) { print_row( $fqc2, $fqc1, $nr_in_both ) # 3-element row } else { print_row( $fqc2, $fqc1 ) # 2-element row } } sub query_to_scalar { my $dbh = shift(@_); warn_v "preparing: @_"; my $qh = $dbh->prepare(@_); #warn_v "executing: @_"; eval { $qh->execute() }; $@ and do { warn_execute_failed(@_); return undef; }; my $result = ( $qh->fetchrow_array() )[0]; $qh->finish(); $result; } sub warn_execute_failed { warn "executing '", join( ', ', @_ ), "' failed: $@\n"; } sub query_to_column { my $dbh = shift(@_); warn_v "preparing: @_"; my $qh = $dbh->prepare(@_); #warn "executing: @_\n"; eval { $qh->execute() }; $@ and do { warn_execute_failed(@_); return undef; }; my @result = map { $_->[0] } @{ $qh->fetchall_arrayref() }; $qh->finish(); @result; } sub column_values { my ( $dbh, $cat, $sch, $tab, $col ) = @_; my $sql = select_distinct( $dbh, $col, $cat, $sch, $tab ); query_to_column( $dbh, $sql ); } sub select_distinct { sprintf( 'SELECT DISTINCT %s FROM %s', quoted_column_and_table(@_) ); } sub select_count { # the remaining argument(s) must be valid for quote_identifier() sprintf( 'SELECT COUNT(%s) FROM %s', quoted_column_and_table(@_) ); } sub select_distinct_count { # the remaining argument(s) must be valid for quote_identifier() sprintf( $driver eq 'MDB' # obviously a poor test ? 'SELECT COUNT(*) FROM (SELECT DISTINCT %s FROM %s)' : 'SELECT COUNT(DISTINCT %s) FROM %s', quoted_column_and_table(@_) ); } sub quoted_column_and_table { my ($dbh) = shift(@_); my ($col) = shift(@_); #$col = $dbh->quote_identifier($col) if $col !~ /[^a-z]/i; # quote $col only if it is a simple identifier - it may be a keyword # why was this? I do need to quote column names with spaces etc. $col = $dbh->quote_identifier($col) if $col ne '*'; ( $col, $dbh->quote_identifier(@_) ); } sub fully_quoted_column { my ($col) = pop(@_); my ($dbh) = shift(@_); my ( $q_col, $q_tab ) = quoted_column_and_table( $dbh, $col, @_ ); join( '.', $q_tab, $q_col ); } sub keys_include_keys { my ( $hashref1, $hashref2 ) = @_; #warn_v "do these ", scalar(keys %$hashref1), " keys include those ", scalar(keys %$hashref2), " keys?"; foreach my $k2 ( keys %$hashref2 ) { return 0 if ( !defined $k2 || !defined $hashref1->{$k2} ); # no NULL in keys } 1; } sub nr_keys_in_both # the number of keys defined in both hashrefs # (without looking at their values) { my ( $hashref1, $hashref2 ) = @_; my $nr_keys_in_both = 0; foreach my $k2 ( keys %$hashref2 ) { next if ( !defined $k2 || !defined $hashref1->{$k2} ); # no NULL in keys ++$nr_keys_in_both; } $nr_keys_in_both; }