Collection of transforms for the Apache beam python SDK.
A collection of random transforms for the Apache beam python SDK . Many are simple transforms. The most useful ones are those for reading/writing from/to relational databases.
pip install beam-nuggets
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
pip install .
See here.
Write data to an SQLite table using beam-nugget's relational_db.Write transform.
# write_sqlite.py contents
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
records = [
{'name': 'Jan', 'num': 1},
{'name': 'Feb', 'num': 2}
]
source_config = relational_db.SourceConfiguration(
drivername='sqlite',
database='/tmp/months_db.sqlite',
create_if_missing=True # create the database if not there
)
table_config = relational_db.TableConfiguration(
name='months',
create_if_missing=True, # automatically create the table if not there
primary_key_columns=['num'] # and use 'num' column as primary key
)
with beam.Pipeline(options=PipelineOptions()) as p: # Will use local runner
months = p | "Reading month records" >> beam.Create(records)
months | 'Writing to DB' >> relational_db.Write(
source_config=source_config,
table_config=table_config
)
Execute the pipeline
python write_sqlite.py
Examine the contents
sqlite3 /tmp/months_db.sqlite 'select * from months'
# output:
# 1.0|Jan
# 2.0|Feb
To write the same data to a PostgreSQL table instead, just create a suitable relational_db.SourceConfiguration as follows.
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
create_if_missing=True # create the database if not there
)
Click here
for more examples, including writing to PostgreSQL in Google Cloud Platform
using the DataFlowRunner.
An example showing how you can use beam-nugget's relational_db.ReadFromDB
transform to read from a PostgreSQL database table.
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
with beam.Pipeline(options=PipelineOptions()) as p:
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
)
records = p | "Reading records from db" >> relational_db.ReadFromDB(
source_config=source_config,
table_name='months',
query='select num, name from months' # optional. When omitted, all table records are returned.
)
records | 'Writing to stdout' >> beam.Map(print)
See here for more examples.
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
export BEAM_NUGGETS_ROOT=`pwd`
pip install -e .[dev]
cd $BEAM_NUGGETS_ROOT
python -m unittest discover -v
cd $BEAM_NUGGETS_ROOT
docs/generate_docs.sh
cd $BEAM_NUGGETS_ROOT
scripts/build_test_deploy.sh
mohaseeb, astrocox, 2514millerj, alfredo, shivangkumar
MIT