I'm working on a project where we need to upload data from Oracle databases to Datastore very frequently.
The
datastore bulkloader bundled in the App Engine Python SDK works very well for data importing and exporting. It currently offers 3 different connector implementations:
csv,
xml and
simpletext.
CSV connector was working smoothly for us but I wanted to avoid the CSV file generation. I wanted to bulkload directly from Oracle.
After some
study I decided to write a new connector: the
oracle_connector.
The only pre requisite for using this connector is having the
cx_Oracle python module properly installed.
Here are some details about my environment:
- Ubuntu 11.10 - 64 bits
- Python 2.7.2+
- cx_Oracle 5.0.4 unicode
- Oracle Instant Client 11.2
- Google App Engine Python SDK 1.6.0
Some design decisions
I wanted to specify a sql query for every kind/entity in the yaml config file and then map the returned (selected) column names or aliases to the properties via the
external_name attribute. I didn't find a way to use a custom
connector_options like "sql_query" so I used the existent
columns option to inform the query.
I also wanted to have the database connection properties to be outside the connector implementation. In order to achieve that I decided to store these configurations in an external file which is passed through the command line "
--filename" parameter to the
appcfg.py script.
The actual implementation
It is basically comprised of 3 files:
- bulkloader.yaml - imports and utilizes the oracle_connector as well as maps queries to entities
- oracle_connector.py - connector implementation based on cx_Oracle (doesn't work for exports)
- db_settings.py - defines connection properties variables used by oracle_connector
Invoking the bulkloader is very simple:
$APPENGINE_HOME/appcfg.py upload_data --config_file=bulkloader.yaml --kind=Table --filename=db_settings.py --email=user@gmail.com --url=http://app-id.appspot.com/remote_api
The
bulkloader.yaml below shows how to import and use the
oracle_connector. Also note the
connector_options attribute.
python_preamble:
- import: base64
- import: re
- import: oracle_connector
- import: google.appengine.ext.bulkload.transform
- import: google.appengine.ext.bulkload.bulkloader_wizard
- import: google.appengine.ext.db
- import: google.appengine.api.datastore
- import: google.appengine.api.users
transformers:
- kind: Table
connector: oracle_connector.OracleConnector.create_from_options
connector_options:
columns: "select TABLE_NAME, TABLESPACE_NAME, LAST_ANALYZED from user_tables"
property_map:
- property: __key__
external_name: TABLE_NAME
- property: tablespace
external_name: TABLESPACE_NAME
- property: last_analyzed
external_name: LAST_ANALYZED
There are some known issues here: 1) you must use
uppercase strings in the
external_name attribute and 2) I wasn't able to return
number columns (had to use
to_char oracle function) due to some problem in my cx_Oracle installation.
Below, the
OracleConnector class which implements the
connector_interface.ConnectorInterface. This was my first piece of python code. Let me know if I can improve it!
#!/usr/bin/env python
"""A bulkloader connector to read data from Oracle selects.
"""
from google.appengine.ext.bulkload import connector_interface
from google.appengine.ext.bulkload import bulkloader_errors
import cx_Oracle
import os.path
class OracleConnector(connector_interface.ConnectorInterface):
@classmethod
def create_from_options(cls, options, name):
"""Factory using an options dictionary.
Args:
options: Dictionary of options:
columns: sql query to perform, each selected column becomes a column
name: The name of this transformer, for use in error messages.
Returns:
OracleConnector object described by the specified options.
Raises:
InvalidConfiguration: If the config is invalid.
"""
columns = options.get('columns', None)
if not columns:
raise bulkloader_errors.InvalidConfiguration(
'Sql query must be specified in the columns '
'configuration option. (In transformer name %s.)' % name)
return cls(columns)
def __init__(self, sql_query):
"""Initializer.
Args:
sql_query: (required) select query which will be sent to database. The returned columns/aliases will be used as the connectors column names
"""
self.sql_query = unicode(sql_query)
def generate_import_record(self, filename, bulkload_state):
"""Generator, yields dicts for nodes found as described in the options.
Args:
filename: py script containing oracle database connection properties: host, port, uid, pwd and service.
bulkload_state: Passed bulkload_state.
Yields:
Neutral dict, one per row returned by the sql query
"""
dbprops = __import__(os.path.splitext(filename)[0])
dsn_tns = cx_Oracle.makedsn(dbprops.host, dbprops.port, dbprops.service)
connection = cx_Oracle.connect(dbprops.uid, dbprops.pwd, dsn_tns)
cursor = connection.cursor()
cursor.arraysize = dbprops.cursor_arraysize
cursor.execute(self.sql_query)
num_fields = len(cursor.description)
field_names = [i[0] for i in cursor.description]
for row in cursor.fetchall():
decoded_dict = {}
for i in range(num_fields):
decoded_dict[field_names[i]] = row[i]
yield decoded_dict
cursor.close()
connection.close()
And finally the contents of
db_settings.py:
uid=u'database_username'
pwd=u'database_password'
host="database_host"
port=1521
service="service"
cursor_arraysize = 50
Get the code
You can download the code here:
https://github.com/fabito/gae_bulkloader_connectors
Summary
This post shows an alternative connector implementation for importing data directly from an Oracle database.
This approach could be easily extended to support other RDBMS such as MySQL or Postgres.
We still have to perform some tests to check how it will behave under different loads and data types but so far, it seems promising.
References