Data Factory Ingestion Framework: Part 1 - Schema Loader
A business wants to utilize cloud technology to enable data science and augment data warehousing by staging and prepping data in a data lake. There are multiple different systems we want to pull from, both in terms of system types and instances of those types.
Source type example: SQL Server, Oracle, Teradata, SAP Hana, Azure SQL, Flat Files ,etc.
The amount of manual coding effort this would take could take months of development hours using multiple resources.
We need a way to ingest data by source type. In other words, the scope of development we want to remain in is only adding a new source type.
Over the course of the next four blogs, I will highlight the architecture for building an automated ingestion framework so that you only ever have to utilize two data factory pipelines and have configuration ability down to an individual dataset. This is meta-data driven approach.
Part 1: The Schema Loader: Ingest source schema\meta-data by system type
Part 2: The Metadata Model: built using Data Vault - this is the secret behind the solution
Part 3: The Data Loader: This will read from the Metadata Model and copy data by source type
Part 4: Data Processing: The Metadata Model will also feed the processing architecture using Apache Spark and Scala. Consider this ETL automation at scale
The Schema Loader
The purpose of the Schema Loader has a single job which is to extract schema data from a given source system type, rather that be SQL Server, SAP Hana, Teradata, a File System, or any other Linked Service type provided by Data Factory.
Get-SourceSystemList: Get a list of distinct source systems\databases to extract schema from. Must have system type column. For the column, align with Data Factory's naming convention for types: sql, asql, sapHana, etc.
Truncate-Target: Run a stored procedure that truncates all staging tables. You will have 1 staging table per source type
Extract-Metadata: Foreach activity that will have 1 ifCondition per source type. Within the ifCondition activity will live a copy activity whose purpose is to hold the specific schema extract query per source type and copy that data to it's designated staging table within the metadata model. These must use dynamic linked services so that we can have only 1 copy activity per source type. Data that we are pulling is items like table names, column names, data types, constraints, etc.
Load-Model: Run a stored procedure that loads the data from the staging tables into the metadata model. *We have not yet reviewed the metadata model, subscribe to keep up with blogs.
Activity: Extract-Metadata example:
Inside the Foreach activity of Extract-Metadata we first have a list of If Conditions activities. One per source type. For this example we have Azure SQL Server, and On-prem SQL Server. Source types follow native connectors already built in Azure Data Factory. Because we are passing in a list of source systems, we can simply use the expression: @equals(item().LinkedService_Type,'sql') to route type information to it's given type activity.
Copy Metadata Example
Here you can see we are also passing in the associated Server and Database per source system configuration record. An example of a SQL Server metadata extract is also included below the screenshot.
DECLARE @version VARCHAR(1000) =
DECLARE @Tables TABLE
INSERT INTO @Tables
t.type_desc AS src_TableSchema_type,
'[' + s.name + ']' + '.[' + t.name + ']' COLLATE DATABASE_DEFAULT AS src_TableName_Full,
ISNULL(CAST(ix.is_primary_key AS INT), 0) is_primary_key
FROM sys.tables t
FROM sys.views v
JOIN sys.schemas s
ON t.schema_id = s.schema_id
JOIN INFORMATION_SCHEMA.COLUMNS c
ON c.TABLE_NAME = t.name COLLATE DATABASE_DEFAULT
AND c.TABLE_SCHEMA = s.name
JOIN sys.columns co
ON co.object_id = t.object_id
AND co.name = c.COLUMN_NAME
JOIN sys.types ty
ON co.user_type_id = ty.user_type_id
FROM sys.index_columns ic
JOIN sys.indexes i
ON i.index_id = ic.index_id
AND i.object_id = ic.object_id
WHERE i.is_primary_key = 1
ON ix.object_id = co.object_id
AND ix.column_id = co.column_id
WHERE c.TABLE_CATALOG NOT IN ( 'master', 'tempdb', 'msdb', 'model' )
SELECT * FROM @Tables ;
In order for the copy activity in step 3 above to accomplish it's task of only have 1 copy activity per source type, we must use dynamic linked services. I created this slide to help understand how dynamic linked services work. I will go over in detail in a later blog.