DataScience_Examples

All about DataSince, DataEngineering and ComputerScience

View the Project on GitHub datainsightat/DataScience_Examples

Apache Beam

Source

https://www.udemy.com/share/102dea3@eL-yo5Kxb3rwTFhsHjhoKrSfdMPGMf6NtzfCEg1P1-m11kFEWjMTBIV9gGxDt792/

Introduction

Unified programming model for efficient and portable Big data processing pipelines. Beam is able to process batch and streaming data (Batch + Stream = Beam). A Beam pipeline can be created in any language like Java, Python, Go etc and be able to run on any of the execution frameworks like Spark, Flink, Apex, Cloud DataFlow.

Evolution BigData Framworks

Beam

Architecture

Beam

Flow of Beam Programming Model

Input > Transform > PCollection > Transform > PCollection > Transform > Output

Example

Input > Read > P1 > Filter > P2 > GroupBy > P3 > Sum > Output

import apache_beam as beam

p1 = beam.Pipeline()

attendance_count = (

  p1
    | beam.io.REadFromText('dept_data.txt'=
    | beam.Map(lamda record: record.split(','))
    | beam.Filter(lambda record: record[3] == 'accounts')
    | beam.Map(lambda record: (record[1],1))
    | beam.CombinePerKey(sum)
    | beam.io.writeToText('data/output_new_final')
  )

p1.run()

Basic Terminology

Installation

Google Colab: https://colab.research.google.com/?utm_source=scs-index

Transformations in Beam

Read Transforms

Files

Messaging Queues

Write Transforms

ParDo Transform

A ParDo transform takes each element of input PCollection, performs processing function on it and emit 0,1 or multiple elements. By default, ParDo returns multiple elements (like Flatmap).

Combiner

CoGrouByKey

Relational join of two or more key/value PCollections

Side Inputs

Additional data provided to a DoFn object. Can be provided to Pardo or its child Transforms (Map, FlatMap).

Data Encoding

Coders encode and decode the elements of a PCollection. Coders do not necessarily have a 1 to 1 relationship with types, there can be multiple encoders for a single input. Coder Registry maps the tpyes to their default order.

Python Coder
int VarIntCoder
float FloarCoder
str BytesCoder
bytes StrUtf8Coder
Tuple TupleCoder
from apache_beam import coders

# Display default coder
coders.registry.get_coder(int)

# Change default coder
coders.registry.register_coder(int, coders.FloatCoder)

Create Custom Coder

class CustomCoder(beam.coders.Coder):

    # Encodes the given object into a byte string
    def encode(self, input_value):
        return()

    # Decodes the given byte string into the corresponding object
    def decode(self, encoded_value):
        return()

    # Ensure elements are encoded the same way on all machines. Default pickle
    def is_deterministic(self):
        return True

    # Estimates the encoded size of the given value, in bytes
    def as_derterministic_coder(self, step_label, error_message=None):
        if self.is_deterministic():
            return self
    else
        raise ValueError(error_message)

    def estimate_size(self, value):
        return len(self.encode(value))

Type Safety - Type Hints

Prevention of typed errors in a programing language.

Inline

Provided during pipeline construction (on Transforms)

evens = (
    p
    | beam.Create(['one','two','three'])
    | beam.Filter(lambda x:x%2 == 0).with_input_types(int)
)

Outline

Provided as properties of the DoFn using decorators

@beam.typehints.with_output_types(int)
@beam.typehints.with_input_types(int)
class FilterEvensDoFn(beam.DoFn):
    def process(self, elemet):
        if element % 2 == 0:
            yield element
            
evens = (
    p
    | beam.Create(['1','2','3'])
    | beam.ParDo(FilterEvensDoFn())
)

Streaming Data Pipelines

PubSub Streaming Architecture

Google Cloud PubSub is a fully managed real-time messaging service which allows you to send and receive messages efficiently between applications and services. Sender and Receiver of messages are decouled.

Beam

Item Description
Publisher Application that creates and sends messages to a topic
Topic Named channel to which messages are sent by publishers. It is stored in a persistent storage where it can be passed to the subscriber
Message Combination of data and attributes that a publisher sends to a topic
Subscriber Consumer of the topic. It creates a subscription to a particualar topic
Subscription Named resource with represents the stream of messages from a single topic. After a subscriber created a subscription, all the messges in a topic are delivered to the subscriber

Windows

Time Notions

Challenges with latency and incompletenes.

Beam

Time-based Windows

Tumbling

Beam

Sliding

Sliding windows can Overlap. Calculation of averages.

Beam

Session Windows

Window closes, after a certain time after last occurance of the key.

Beam

Global Windows

A single window for each key. Triggers determine, if window should be calculated.

Beam

Watermarks

Measure progress in event time. Define ‘late elements’. Alloews lateness is the time by which an element can be late before it is dropped.

Beam

Triggers

Beam

Composite Triggers

Case Study Game Development

Beam

Deploy Beam Pipeline Dataflow

General Pipeline

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import argparse

parser = argparse.ArgumentParser()

parser.add_argument('--input,dest='input',required=True,help='Input file to process')
parser.add_argument('--output,dest='output',required=True,help='Output file to write results to.'

path_args, pipeline_args = parser.parse_known_args()

inputs_pattern = path_args.input
outputs_prefix = path_args.output

options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=options)

attendance_count = (
    p
    | 'Read lines' >> beam.io.ReadFromText('inputs_pattern')
    | 'Split row' >> beam.Map(lambda record: record.split(','))
    | 'Get all Accounts Dept Persons' >> beam.Filter(lambda record: record[3] == 'Accounts')
    | 'Pair each employee with 1' >> beam.Map(lambda record: (record[1],1))
    | 'Group and sum' >> beam.CombinePerKey(sum)
    | 'Format results' >> beam.Map(lambda employee_count: str(employee_count))
    | 'Write results' >> beam.io.WriteToText(outputs_prefix)
)

Call Pipeline on GCP

https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python

gcp > Select Project

Enable APIs

Create Service Account

gcp > New Service Account > beam > Owner

Set Environment Varliable

set GOOGLE_ACCOUNT_CREDENTIALS=C:\\Users\Downloads\Beam-asdfasdf.json

Create Cloud Storage Bucket

gcp > Cloud Storage > Create Bucket > demo_files1, demo_temp1

Upload pipeline and data to bucket

Run Pipeline

$ python attendance.py 
--input gs://demo_files1/dept_data.txt 
--output gs://demo_files1/outputs/part 
--runner DataflowRunner 
--project myproject 
--temp_location gs://demo_temp1/tmp