Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for source schema and exclude/include pattern and fix numeric type cast #28

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ rvm:
before_script:
- psql -c 'create database travis_ci_test;' -U postgres
env:
- POSTGRES_TO_REDSHIFT_SOURCE_URI=postgres://postgres@localhost/travis_ci_test
- POSTGRES_TO_REDSHIFT_SOURCE_URI=postgres://postgres@localhost/travis_ci_test POSTGRES_TO_REDSHIFT_EXCLUDE_TABLE_PATTERN=exclude_table
18 changes: 9 additions & 9 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ PATH
specs:
postgres_to_redshift (0.1.2)
aws-sdk-v1 (~> 1.54)
pg (~> 0.17.0)
pg (~> 0.18.0)

GEM
remote: https://rubygems.org/
specs:
aws-sdk-v1 (1.66.0)
aws-sdk-v1 (1.67.0)
json (~> 1.4)
nokogiri (>= 1.4.4)
nokogiri (~> 1)
diff-lcs (1.2.5)
json (1.8.3)
mini_portile (0.6.2)
nokogiri (1.6.6.2)
mini_portile (~> 0.6.0)
pg (0.17.1)
json (1.8.6)
mini_portile2 (2.3.0)
nokogiri (1.8.1)
mini_portile2 (~> 2.3.0)
pg (0.18.4)
rake (10.4.2)
rspec (3.2.0)
rspec-core (~> 3.2.0)
Expand All @@ -42,4 +42,4 @@ DEPENDENCIES
rspec

BUNDLED WITH
1.10.5
1.16.0
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,25 @@ Or install it yourself as:

Set your source and target databases, as well as your s3 intermediary.

*Note: Only set the `POSTGRES_TO_REDSHIFT_EXCLUDE_TABLE_PATTERN` if you want to exlude certain table(comma seperate for multiple value)*

*Note: All tables are by default included.*

*Note: set `DROP_TABLE_BEFORE_CREATE` to true will drop the table on target before creation(default value is false)*

*Note: `POSTGRES_TO_REDSHIFT_SOURCE_SCHEMA` is default to public*

```bash
export POSTGRES_TO_REDSHIFT_SOURCE_URI='postgres://username:password@host:port/database-name'
export POSTGRES_TO_REDSHIFT_TARGET_URI='postgres://username:password@host:port/database-name'
export POSTGRES_TO_REDSHIFT_TARGET_SCHEMA='testing-data'
export POSTGRES_TO_REDSHIFT_SOURCE_SCHEMA='testing-data'
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth mentioning that this is 'public' by default

export S3_DATABASE_EXPORT_ID='yourid'
export S3_DATABASE_EXPORT_KEY='yourkey'
export S3_DATABASE_EXPORT_BUCKET='some-bucket-to-use'
export DROP_TABLE_BEFORE_CREATE=true
export POSTGRES_TO_REDSHIFT_EXCLUDE_TABLE_PATTERN='table-pattern-to-exclude1,table-pattern-to-exclude2'
export POSTGRES_TO_REDSHIFT_INCLUDE_TABLE_PATTERN='table-pattern-to-include1,table-pattern-to-include2'

postgres_to_redshift
```
Expand Down
53 changes: 46 additions & 7 deletions lib/postgres_to_redshift.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,34 @@ class << self
def self.update_tables
update_tables = PostgresToRedshift.new

puts "exclude_filters: #{exclude_filters}"
puts "include_filters: #{include_filters}"

target_connection.exec("CREATE SCHEMA IF NOT EXISTS #{target_schema}")

update_tables.tables.each do |table|
target_connection.exec("CREATE TABLE IF NOT EXISTS #{schema}.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")
next if exclude_filters.any? { |filter| table.name.downcase.include?(filter.downcase) }

next unless include_filters.blank? || include_filters.any?{ |filter| table.name.downcase.include?(filter.downcase) }

target_connection.exec("DROP TABLE #{target_schema}.#{table.name}") if drop_table_before_create

target_connection.exec("CREATE TABLE IF NOT EXISTS #{target_schema}.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")

update_tables.copy_table(table)

update_tables.import_table(table)
end
end

def self.exclude_filters
(ENV['POSTGRES_TO_REDSHIFT_EXCLUDE_TABLE_PATTERN'] || "").split(',')
end

def self.include_filters
(ENV['POSTGRES_TO_REDSHIFT_INCLUDE_TABLE_PATTERN'] || "").split(',')
end

def self.source_uri
@source_uri ||= URI.parse(ENV['POSTGRES_TO_REDSHIFT_SOURCE_URI'])
end
Expand All @@ -38,6 +57,10 @@ def self.target_uri
@target_uri ||= URI.parse(ENV['POSTGRES_TO_REDSHIFT_TARGET_URI'])
end

def self.drop_table_before_create
ENV['DROP_TABLE_BEFORE_CREATE']
end

def self.source_connection
unless instance_variable_defined?(:"@source_connection")
@source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1])
Expand All @@ -55,10 +78,22 @@ def self.target_connection
@target_connection
end

def self.schema
def self.target_schema
ENV.fetch('POSTGRES_TO_REDSHIFT_TARGET_SCHEMA')
end

def self.source_schema
ENV['POSTGRES_TO_REDSHIFT_SOURCE_SCHEMA'] || 'public'
end

def target_schema
self.class.target_schema
end

def source_schema
self.class.source_schema
end

def source_connection
self.class.source_connection
end
Expand All @@ -68,7 +103,7 @@ def target_connection
end

def tables
source_connection.exec("SELECT * FROM information_schema.tables WHERE table_schema = 'public' AND table_type in ('BASE TABLE', 'VIEW')").map do |table_attributes|
source_connection.exec("SELECT * FROM information_schema.tables WHERE table_schema = '#{source_schema}' AND table_type in ('BASE TABLE', 'VIEW') ORDER BY table_name ASC").map do |table_attributes|
table = Table.new(attributes: table_attributes)
next if table.name =~ /^pg_/
table.columns = column_definitions(table)
Expand All @@ -77,7 +112,7 @@ def tables
end

def column_definitions(table)
source_connection.exec("SELECT * FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table.name}' order by ordinal_position")
source_connection.exec("SELECT * FROM information_schema.columns WHERE table_schema='#{source_schema}' AND table_name='#{table.name}' order by ordinal_position")
end

def s3
Expand All @@ -90,13 +125,14 @@ def bucket

def copy_table(table)
tmpfile = Tempfile.new("psql2rs")
tmpfile.binmode
zip = Zlib::GzipWriter.new(tmpfile)
chunksize = 5 * GIGABYTE # uncompressed
chunk = 1
bucket.objects.with_prefix("export/#{table.target_table_name}.psv.gz").delete_all
begin
puts "Downloading #{table}"
copy_command = "COPY (SELECT #{table.columns_for_copy} FROM #{table.name}) TO STDOUT WITH DELIMITER '|'"
copy_command = "COPY (SELECT #{table.columns_for_copy} FROM #{source_schema}.#{table.name}) TO STDOUT WITH DELIMITER '|'"

source_connection.copy_data(copy_command) do
while row = source_connection.get_copy_data
Expand All @@ -109,6 +145,7 @@ def copy_table(table)
zip.close unless zip.closed?
tmpfile.unlink
tmpfile = Tempfile.new("psql2rs")
tempfile.binmode
zip = Zlib::GzipWriter.new(tmpfile)
end
end
Expand All @@ -130,8 +167,8 @@ def upload_table(table, buffer, chunk)

def import_table(table)
puts "Importing #{table.target_table_name}"
schema = self.class.schema
schema = self.class.target_schema

target_connection.exec("DROP TABLE IF EXISTS #{schema}.#{table.target_table_name}_updating")

target_connection.exec("BEGIN;")
Expand All @@ -142,6 +179,8 @@ def import_table(table)

target_connection.exec("COPY #{schema}.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")

target_connection.exec("DROP TABLE IF EXISTS #{schema}.#{table.target_table_name}_updating")

target_connection.exec("COMMIT;")
end
end
10 changes: 9 additions & 1 deletion lib/postgres_to_redshift/column.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class PostgresToRedshift::Column
"oid" => "CHARACTER VARYING(65535)",
"ARRAY" => "CHARACTER VARYING(65535)",
"USER-DEFINED" => "CHARACTER VARYING(65535)",
"uuid" => "CHARACTER VARYING(65535)",
"inet" => "CHARACTER VARYING(65535)",
"name" => "CHARACTER VARYING(65535)",
"bit" => "CHARACTER(1)"
}

def initialize(attributes: )
Expand All @@ -74,7 +78,11 @@ def name_for_copy
end

def data_type
attributes["data_type"]
if attributes["data_type"] == "numeric" && !attributes["numeric_precision"].nil? && !attributes["numeric_scale"].nil?
"numeric(#{attributes["numeric_precision"]},#{attributes["numeric_scale"]})"
else
attributes["data_type"]
end
end

def data_type_for_copy
Expand Down
29 changes: 29 additions & 0 deletions spec/features/small_table_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,32 @@
end
end
end

RSpec.describe 'a small source database with table to be excluded', type: :feature do
context 'with a simple column' do
before(:all) do
PostgresToRedshift::Test.test_connection.exec(<<-EOS
DROP TABLE IF EXISTS "p2r_integration";
DROP TABLE IF EXISTS "exclude_table";
CREATE TABLE IF NOT EXISTS "p2r_integration" ("id" SERIAL PRIMARY KEY, "title" text);
INSERT INTO "p2r_integration" ("title") VALUES ('Casablanca');
CREATE TABLE IF NOT EXISTS "exclude_table" ("id" SERIAL PRIMARY KEY, "value" text);
INSERT INTO "p2r_integration" ("value") VALUES ('Test');
EOS
)
end
after(:all) do
PostgresToRedshift::Test.test_connection.exec(%q[DROP TABLE IF EXISTS "p2r_integration";])
PostgresToRedshift::Test.test_target_connection.exec(%q[DROP TABLE IF EXISTS "p2r_integration";])
end

it 'Copies all rows to target table' do
PostgresToRedshift.update_tables
result = PostgresToRedshift::Test.test_target_connection.exec(
'SELECT * FROM "p2r_integration";'
)
expect(result.num_tuples).to eq(1)
expect(result[0]).to eq('title' => 'Casablanca', 'id' => '1')
end
end
end