All about DataSince, DataEngineering and ComputerScience
View the Project on GitHub datainsightat/DataScience_Examples
https://github.com/datainsightat/DataScience_Examples/tree/main/de/dbt
pip install virtualenv
virtualenv venv
source venv\Scripts\activate
dbt compile
dbt run
dbt run --select src_hosts+
dbt snapshot
dbt test
Get packages fromhttps://hub.getdbt.com/ and reference it in packages.yml
dbt deps
dbt docs generate
dbt docs serve

Data Collection > Data Wrangling > Data Integration > BI and Analytics > Artificial Intelligence
Write data from staging area to database.
A central repository of integrated data from one or more disparate sources. It is used to store structured data.

Storage of unstructured data like Images and text files.
Data management like in a data warehouse, but data is stored cheap datalakes
| Type | Desctiption | 
|---|---|
| 0 | Not updating the DWH when a dimension changes | 
| 1 | Overwrite original data | 
| 2 | Add new row | 
| 3 | Add new attribute | 
-- STEP 1
with raw_listings as (
-- STEP 2
    select * from [source] [listings]
)
-- STEP 3
select * ftom raw_listings
Each processor is trated equally and runs in parallel with one another. The workload is evenly distributed when processing the program. Each processor shares the same resources. A single computer with multuple CPU cores.
This architecture can only scale vertically.
Master node and compute nodes. Each processor has its own dedicated resources. Many computers work together.

Comopute nodes are shut down, if not needed. Data persists.
Row oriented databases are good in reading and writing data, but not efficient for analytical workloads.



| Type | Use | Don’t Use | 
|---|---|---|
| View | lightweight representation | Date often used | 
| Table | Data often used | Single-use models, Incremental tables | 
| Incremental | Fact tables, Appends | Update historical records | 
| Ephemeral | Alias data | Read model several times | 
Copy csv file in ‘seeds’ folder
dbt seed
Sources are abstractions of tables.
with raw_reviews as (
    select * from --AIRBNB.RAW.RAW_REVIEWS
)
Check, if data is current. Condigure freshness in ‘sources.yml’
sources:
  - name: airbnb
    schema: raw
    tables:
      - name: reviews
        identifier: raw_reviews
        loaded_at_field: date
        freshness:
          warn_after: {count: 1, period: hour}
          error_after: {count: 24, period: hour}
run command
dbt source freshness
Handle type 2 slowly changing dimensions. Example update e-mail address and keep history. dbt adds two dimensions: dbt_valid_from and dbt_valid_to.
| Strategy | How To | 
|---|---|
| Timestamp | Unique columns: key and updated_ad | 
| Check | Any change in set of columns trigger snapshot | 
snapshots/scd_raw_listings.sql
{\% snapshot scd_raw_listings \%}
select * from 
{\% endsnapshot \%}
models/schema.yml
version: 2
models:
  - name: dim_listings_cleansed
    columns:
      - name: listing_id
        tests:
          - unique
          - not_null
          
      - name: host_id
        tests:
          - relationships:
              to: ref('dim_hosts_cleaned')
              field: host_id
              
      - name: room_type
        tests:
          - accepted_values:
              values: ['Entire home/apt',
                      'Private room',
                      'Shared room',
                      'Hotel room']
Tests passes, if query returns no values.
test/dim_listings_minimum_nights.sql
select
    *
from
    
where
    minimum_nights < 1
limit 10
tests/consistent_crated_at.sql
SELECT
    *
FROM
     l
INNER JOIN  r
    USING (listing_id)
WHERE
    l.created_at >= r.review_date
macros/no_null_in_columns.sql
{\% macro no_nulls_in_columns(model) \%}
    select
        *
    from
        
    where
        {\% for col in adapter.get_columns_in_relation(model) -\%}
             is null or
        {\% endfor \%}
            false
{\% endmacro \%}
tests/no_nulls_in_dim_listings.sql
dbt test --select dim_listings_cleansed
macros/positive_value.sql
{\% test positive_value(model,column_name)\%}
    select
        *
    from
        
    where
         < 1
{\% endtest \%}
models/schema.yml
- name: minimum_nights
    test:
        - positive_value
https://hub.getdbt.com/
packages.yml
packages:
    - package: dbt-labs/dbt_utils
        version: 0.8.0
models/fct/fct_reviews.sql
with src_reviews as (
    select * from 
)
select
     as review_id,
    *
from
    src_reviews
where
    review_text is not null
{\% if is_incremental() \%}
    and review_date > (select max(review_date) from )
{\% endif \%}
$ dbt run --full-refresh --select fct-reviews
models/schema.yml
version: 2
models:
  - name: dim_listings_cleansed
    description: Cleansed table which contains Airbnb listings.
    columns:
      - name: listing_id
        decription: Primary key fot the listing
models/docs.md
{\% docs dim_listing_cleansed__minimum_nights \%}
Minmum number of nights required to rent this property.
Keep in mind that old listings might have 'minimum_nights' set
to 0 in the source tables. Our cleansing algorithm updates this
to '1'.
{\% enddocs \%}
models/schema.yml
- name: minimum_nights
  description: ''
Put images in folder “assets”
dbt_project.yml
asset-paths:["assets"]
models/overview.md
{\% docs __overview__ \%}
#Airbnb Pipeline
Hey, welcome to our Airbnb pipeline documentation!
Here is the schema of our input data:

{\% enddocs \%}
Queries, without crearting a model
analyses/full_moon_no_sleep.sql
with markt_fullmoon_reviews as (
    select * from 
)
select
    is_full_moon,
    review_sentiment,
    count(*) as reviews
from
    mart_fullmoon_reviews
group by
    is_full_moon,
    review_sentiment
order by
    is_full_moon,
    review_sentiment
$ dbt compile
$ less target/compiled/dbtlearn/analyses/full_moon_no_sleep.sql
Links to external applications.
models/dashboard.yml
version: 2
exposures:
    - name: Executive Dashboard
      type: dashboard
      maturity: low
      url: XXX
      description: Executive Dashboard
      
      depends_on:
        - ref('dim_listings_cleansed')
        - ref('mart_fullmoon_reviews')
        
      owner:
        name: me
        email: me@email.com
https://docs.getdbt.com/docs/building-a-dbt-project/building-models/python-models
models/my_python_model.py
def model(dbt, session):
...
return final_df
def model(dbt, session):
    # DataFrame representing an upstream model
    upstream_model = dbt.ref("upstream_model_name")
    # DataFrame representing an upstream source
    upstream_source = dbt.source("upstream_source_name", "table_name")
    ...
models/downstream_model.sql
with upstream_python_model as (
    select * from 
),
...
/loop.sh
#!/bin/bash
clear
echo "#### Set variables ... ####"
START_MONTH=202201
END_MONTH=202212
declare -i periods=$END_MONTH-$START_MONTH
echo $periods
echo "#### Loop ... ####"
# Write new table with first period
declare -i period=$START_MONTH
echo $period
# declare 'model_increment' as: 
dbt run --vars '{start_period: '$period',end_period: '$period'}' --full-refresh --select model_increment --target schema
# Append periods, if more than 1 period
if [ $periods > 1 ]
then
    for i in $(seq 1 $periods)
    do
        declare -i period=$START_MONTH+$i
        echo $period
        dbt run --vars '{start_period: '$period',end_period: '$period'}' --select model_increment --target schema
    done
fi
import requests
import xmltodict
from requests_ntlm2 import HttpNtlmAuth
import pandas as pd
import os
import re
import logging
logging.basicConfig(level=logging.INFO)
logging.info('Module sharepoint loaded')
class SharepointSource(object):
    def __init__(self,site_url,user,password,db_name=None, db_user=None, db_pw=None): #,listener_table=None
        self.site_url = site_url
        self.api_url = "{0}_api/web/".format(self.site_url)
        self.user = user
        self.authntlm = HttpNtlmAuth(user,password)
    def get_list_filenames(self,page_url):
        sp_url = "{0}GetFolderByServerRelativeUrl('{1}')/files".format(self.api_url,page_url) #/Files?$select=Name
        response = requests.get(sp_url,auth=self.authntlm)
        data = xmltodict.parse(response.text)
        file_count = len(data["feed"]["entry"])
        
        files = []
        for i in range(file_count):
            if (file_count == 0):
                files.append([data["feed"]["entry"]["content"]["m:properties"]["d:Name"],data["feed"]["entry"]["content"]["m:properties"]["d:TimeLastModified"]["#text"]])
            else:
                files.append([data["feed"]["entry"][i]["content"]["m:properties"]["d:Name"],data["feed"]["entry"][i]["content"]["m:properties"]["d:TimeLastModified"]["#text"]]) #[i]
        return(files)
import os
import pandas as pd
import numpy as np
import re
import yaml
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
import logging
logging.basicConfig(level=logging.INFO)
from sharepointAPI import sharepoint
###############
# Load Config #
###############
with open(os.path.dirname(__file__)+'/src_config.yaml', 'r') as f:
    config = yaml.safe_load(f)
################
# Extract Data #
################
def extract():
    site_url=config['site_url']['upload']
    sp_folder = config['folder']['datenbasis']
    sp = sharepoint.SharepointSource(site_url,user=os.getenv(config['sharepoint']['user_env']),password=os.getenv(config['sharepoint']['pw_env']))
    name = sp.get_latest_filename(page_url=sp_folder)[0]
    logging.info(name)
    raw_data = sp.get_file(page_url=sp_folder,file=name)
    data = pd.read_excel(raw_data,sheet_name=0,engine='openpyxl')
    logging.debug('extract done ...')
    return data
data_l = extract()
logging.debug(data_l.head())
#####################
# Paths and Folders #
#####################
sharepoint :
  user_env : 'user' # Set the value of this variable in your .env file
  pw_env : 'pw' # Set the value of this variable in your .env file