All about DataSince, DataEngineering and ComputerScience
View the Project on GitHub datainsightat/DataScience_Examples
https://www.udemy.com/share/102dea3@eL-yo5Kxb3rwTFhsHjhoKrSfdMPGMf6NtzfCEg1P1-m11kFEWjMTBIV9gGxDt792/
Unified programming model for efficient and portable Big data processing pipelines. Beam is able to process batch and streaming data (Batch + Stream = Beam). A Beam pipeline can be created in any language like Java, Python, Go etc and be able to run on any of the execution frameworks like Spark, Flink, Apex, Cloud DataFlow.
Input > Transform > PCollection > Transform > PCollection > Transform > Output
Input > Read > P1 > Filter > P2 > GroupBy > P3 > Sum > Output
import apache_beam as beam
p1 = beam.Pipeline()
attendance_count = (
p1
| beam.io.REadFromText('dept_data.txt'=
| beam.Map(lamda record: record.split(','))
| beam.Filter(lambda record: record[3] == 'accounts')
| beam.Map(lambda record: (record[1],1))
| beam.CombinePerKey(sum)
| beam.io.writeToText('data/output_new_final')
)
p1.run()
Google Colab: https://colab.research.google.com/?utm_source=scs-index
A ParDo transform takes each element of input PCollection, performs processing function on it and emit 0,1 or multiple elements. By default, ParDo returns multiple elements (like Flatmap).
Relational join of two or more key/value PCollections
Additional data provided to a DoFn object. Can be provided to Pardo or its child Transforms (Map, FlatMap).
Coders encode and decode the elements of a PCollection. Coders do not necessarily have a 1 to 1 relationship with types, there can be multiple encoders for a single input. Coder Registry maps the tpyes to their default order.
Python | Coder |
---|---|
int | VarIntCoder |
float | FloarCoder |
str | BytesCoder |
bytes | StrUtf8Coder |
Tuple | TupleCoder |
from apache_beam import coders
# Display default coder
coders.registry.get_coder(int)
# Change default coder
coders.registry.register_coder(int, coders.FloatCoder)
class CustomCoder(beam.coders.Coder):
# Encodes the given object into a byte string
def encode(self, input_value):
return()
# Decodes the given byte string into the corresponding object
def decode(self, encoded_value):
return()
# Ensure elements are encoded the same way on all machines. Default pickle
def is_deterministic(self):
return True
# Estimates the encoded size of the given value, in bytes
def as_derterministic_coder(self, step_label, error_message=None):
if self.is_deterministic():
return self
else
raise ValueError(error_message)
def estimate_size(self, value):
return len(self.encode(value))
Prevention of typed errors in a programing language.
Special TypeHint: Special Types like Any, …
evens = ( p | beam.Create([‘one’,’two’,’three’]) | beam.Filter(lambda x:x%2 == 0) )
Provided during pipeline construction (on Transforms)
evens = (
p
| beam.Create(['one','two','three'])
| beam.Filter(lambda x:x%2 == 0).with_input_types(int)
)
Provided as properties of the DoFn using decorators
@beam.typehints.with_output_types(int)
@beam.typehints.with_input_types(int)
class FilterEvensDoFn(beam.DoFn):
def process(self, elemet):
if element % 2 == 0:
yield element
evens = (
p
| beam.Create(['1','2','3'])
| beam.ParDo(FilterEvensDoFn())
)
Google Cloud PubSub is a fully managed real-time messaging service which allows you to send and receive messages efficiently between applications and services. Sender and Receiver of messages are decouled.
Item | Description |
---|---|
Publisher | Application that creates and sends messages to a topic |
Topic | Named channel to which messages are sent by publishers. It is stored in a persistent storage where it can be passed to the subscriber |
Message | Combination of data and attributes that a publisher sends to a topic |
Subscriber | Consumer of the topic. It creates a subscription to a particualar topic |
Subscription | Named resource with represents the stream of messages from a single topic. After a subscriber created a subscription, all the messges in a topic are delivered to the subscriber |
Challenges with latency and incompletenes.
Sliding windows can Overlap. Calculation of averages.
Window closes, after a certain time after last occurance of the key.
A single window for each key. Triggers determine, if window should be calculated.
Measure progress in event time. Define ‘late elements’. Alloews lateness is the time by which an element can be late before it is dropped.
Finally: Final condition that causes any trigger to fire on final time
pcollection | WindowInto (FixedWindows(60) trigger=Repeatedly( AfterAny( AfterCount(50), AfterProcessingTime(20))), accumulation_mode=AccumulationMode.DISCARDING)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--input,dest='input',required=True,help='Input file to process')
parser.add_argument('--output,dest='output',required=True,help='Output file to write results to.'
path_args, pipeline_args = parser.parse_known_args()
inputs_pattern = path_args.input
outputs_prefix = path_args.output
options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=options)
attendance_count = (
p
| 'Read lines' >> beam.io.ReadFromText('inputs_pattern')
| 'Split row' >> beam.Map(lambda record: record.split(','))
| 'Get all Accounts Dept Persons' >> beam.Filter(lambda record: record[3] == 'Accounts')
| 'Pair each employee with 1' >> beam.Map(lambda record: (record[1],1))
| 'Group and sum' >> beam.CombinePerKey(sum)
| 'Format results' >> beam.Map(lambda employee_count: str(employee_count))
| 'Write results' >> beam.io.WriteToText(outputs_prefix)
)
https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python
gcp > Select Project
gcp > New Service Account > beam > Owner
set GOOGLE_ACCOUNT_CREDENTIALS=C:\\Users\Downloads\Beam-asdfasdf.json
gcp > Cloud Storage > Create Bucket > demo_files1, demo_temp1
Upload pipeline and data to bucket
$ python attendance.py
--input gs://demo_files1/dept_data.txt
--output gs://demo_files1/outputs/part
--runner DataflowRunner
--project myproject
--temp_location gs://demo_temp1/tmp