All about DataSince, DataEngineering and ComputerScience
View the Project on GitHub datainsightat/DataScience_Examples
Dataflow | Dataproc | |
---|---|---|
Recommended for | New data processing pipelines, unified batch and streaming | Existing Hadoop/Spak applications, ML, large-batch jobs |
Fully managed | Yes | No |
Auto scaling | Yes (adaptive) | Yes (reactive) |
Expertise | Apache Beam | Hadoop, Hive, Pig, Sprak … |
In a PCollection all data is immutable and stored as bytestring.
import apache_beam as beam
if __name == '__main__':
with beam.Pipeline(argv=sys.argv) as p: # Create pipeline parameterized by command line flags
(p
| beam.io.ReadFromText('gs://...') # Read Input
| beam.FlatMap(count_words) # Apply transform
| beam.io.WriteToText('gs://...') # Write output
)
options = {'project':<project>,
'runner':'DataflowRunner'. # Where to run
'region':<region>.
'retup_file':<setup.py file>}
pipeline_options = beam.pipeline.PipelineOptions(flags=[],**options)
pipeline = beam.Pipeline(options=pipeline_options) # Creates the pipeline
$ python ./grep.py
$ python ./grep.py \
--project=$PROJECT \
--job_name=myjob \
--staging_location=gs://$BUCKET/staging/ \
--temp_location=gs://$BUCKET/tmp/ \
--runnner)DataflowRunner
with beam.Pipeline(options=pipeline_options as p:
# Cloud Storage
lines = p | beam.ioReadFromText("gs://..../input-*.csv.gz")
# Pub/Sub
lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)
# BigQuery
query = "select x, y, z from `project.dataset.tablename`"
BQ_source = beam.io.BigQuerySource(query = <query>, use_standard_sql=True)
BQ_data = pipeline | beam.io.Read(BG_srouce)
from apache_beam.io.gcp.internal.clients import bigquery
table_spec = bigquery.TableReference(
projectId='clouddataflow-readonly',
datasetId='samples',
tableId='weather_stations')
p | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQuerryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IT_NEEDED)
'WordLengths' >> beam.Map(word, len(word))
def my_grep(line_term):
if term in line:
yield line
'Grep' >> beam.FlatMap(my_grep(line, searchTerm))
words = ...
# Do Fn
class ComputeWordLengthFn(beam.DoFn):
def process(self,element):
return [len(element)}
# ParDo
word_lengths = words | beam.ParDo(ComputeWordLengthFn())
results = (words | beam.PrDo(ProcessWords(),
cutoff_length=2, marker='x')
.with_putputs('above_cutoff_lengths','marked strings',main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_strings
marked = results['marked strings']
Data skew makes grouping less efficient at scale.
totalAmount = salesAmounts | CombineGlobally(sum)
totalSalesPerPerson = salesRecords | CombinePerKey(sum)
You must provide four functions
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return(0.0,0)
def add_input(self, sum_count, input):
(sum, count) = sum_count
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, sum_count):
(sum, count) = sum_count
return sum / count if count else float('NaN')
pc = ...
average = pc | beam.CombineGlobally(AverageFn())
Combine is more efficient that GroupByKey
A side-input is an input the do-function can access every time it processes an element of the inputP collection.
words = ...
def filter_using_length(word, lower_bound, upper_bound=floar('inf')):
if lower_bound <= len(word) <= upper_bound:
yield word
small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)
# Side input
avg_word_len = (words
| beam.Map(len)
| beam.CombineGlobally(beam.containers.MeanCombineFn())
larger_than_average = (words | 'large' >> beam.FlatMap(
filter_using_length,
lower_bound=value.AsSingleton(avg_word_len)))
Unbounded PCCollection not useful for Streaming data.
Use time windows.
lines = p | 'Create' >> beam.io.ReadFromText('access.log')
windowd_counts = (
lines
| 'Timestamp' >> beam.Map(beam.window.TimestampedValue(X, extract_timestamp(x)))
| 'Window' >> beam.WindowInto(beam.window.SlidingWindows(60,30))
| 'Count' >> (beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())
)
windowed_counts = windowed_counts | beam.ParDo(PrintWindowFn())
Separate developer from user.
Each template has metadata:
Timestamps can be modified
unix_timestamp = extract_timestamp_from_log_entry(element)
yield beam.window.TimestampedValue(element, unix_timestamp)
msg.publish(event_data, ,myid="34xwy57223cdg")
p.apply(PubsubIO.readStrings().fromTopic(t).idLabel("myid"))
from apache_beam import window
fixed_window_items = (items | 'window' >> beam.WindowInto(window.FixedWindows(60)))
from apache_beam import window
fixed_window_items = (items | 'window' >> beam.WindowInto(window.SlidingWindows(30,5)))
from apache_beam import window
fixed_window_items = (items | 'window' >> beam.WindowInto(window.Sessions(10*60)))
Late messages won’t be processed. You can decide to re-read the dataset.
pc = [Initial PCollection]
pc | beam.WindowInto(
FixedWindow(60),
trigger=trigger_fn,
accumulation_mode=accumulation_mode,
timestamp_combiner=timestamp_combiner,
allowed_lateness=Duration(seconds=2*24*60*60)) # 2 days
Provide portability framework for data pipelines.
$ from apache/beam_python3.8_sdk:2.25.0
$ env my_file_name=my_file.txt
$ copy path/to/myfile/$MY_FILE_NAME ./
$ export PROJECT=my-project-id
$ export REPO=my-repository
$ export TAG=my-image-tag
$ export REGISTRY_HOST=gcr.io
$ export IMAGE_URI=$REGISTRY_HOST/$PROJECT/$REPO:$TAG
$ gcloud builds submit --tag $IMAGE_URI
$ docker build -f Dockerfile -t $IMAGE_URI ./
$ docker push $IMAGE_URI
$ python my-pipeline.py \
--input=INPUT_FILE \
--output=OUTPUT_FILE \
--project=PROJECT_ID \
--region=REGION \
--temp_location=TEMP_LOCATION \
--runner=DataflowRunner \
--worker_harness_container_image=$IMAGE_URI
from apache_beam.io.kafka import ReadFromKafka
with beam.Pipeline(options=<You Beam PipelineOptions object>) as p:
p
| ReadFromKafka(
consumer_config={'bootstrap.servers':'Kafka bootstrap servers list>'},
topics=[<List of Kafka topics>])
Dataflow allows executing Apache Beam Pipelines on Google Cloud.
Only for Batch pipelines. Faster execution time.
For Streaming Data pipelines. Less CPU and Memory.
Reduce cost of batch processing pipelines
Execution within 6 hours. For non-time critical workflows.
Total number of CPUs consumed in a region.
gcp > IAM > Quota.
Total number of VMs with external IP address in a region.
Either HDD, or SSD. set –worker_disk_type flag (pd-ssd).
Ensure all data and metadata stays in one region.
Customer Managed Encryption Key. By default, Google manages key encryption. Customers can use symmetric CMEK keys stored in Google Cloud managed key service. Job metadata is encrypted with Google encryption.
Start Dataflow Pipeline
Dataflow Pipeline Coding Examples
Transformation | Description |
---|---|
ParDo | The most-general mechanism for applying a user-defined DoFn to every element in the input collection. |
Combine | Transforms to combine elements. |
GroupByKey | Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key. |
CoGroupByKey | Takes several keyed collections of elements and produces a collection where each element consists of a key and all values associated with that key. |
Flatten | Given multiple input collections, produces a single output collection containing all elements from all of the input collections. |
Partition | Routes each input element to a specific output collection based on some partition function. |
Python Transformation Catalogue
class MyDoFn(bean.DoFn):
def setup(self):
pass
def start_bundle(self):
pass
def process(self,element):
pass
def finish_bundle(self):
pass
def teardown(self):
pass
Out order Stream.
Data is only late when it exceeds the watermark.
Decide when to close window, even if late data has not arrived.
pcollection | Windowinto(
SlidingWindows(60,5),
trigger=AfterWatermark(
early=AfterProcessingTime(delay=30),
late=AfterCount(1))
accumulation_mode=AccumulationMode.ACCUMULATING)
pcollection | WindowInto(
FixedWindow(60),
trigger=Repeatedly(
AfterAny(
AfterCound(100),
AfterProcessingTime(1*60))),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=Duration(seconds=2*24*60*60))
Java
@AutoValue
public abstract static class Write<T> extends
PTransform<PCollectoin<T>,WriteResult> {
Python
class WriteToPubSub(PTransform):
Pipeline
.apply(
"Read from source",
TextIO
.read()
.from(options
.getInputFilePattern()))
p.apply(
FileIO
.match()
.filepattern("hdfs://path/to/*.gz"))
.apply(
FileIO
.readMatches().withCompression(Compression.GZIP))
.apply(
ParDo.of(
mew DoDn<FileIO.ReadableFile, String>() {
@ProcessElement
public void process(
@Element FileIO.ReadableFile file) {
LOG.info("File Metadata resourceId is {} ",
file.getMetadata().resourceId());
}
}));
p.apply(
FileIO.match()
.filepattern("...")
.continuously (
Duration.standardSeconds(30),
Watch.Growth.afterTimeSinceNewOutput(
Duration.standardHours(1))));
PCollection<Row> records1 =
p.apply(ContextualTextIO.read().from("..."));
PCollection<Row> records2 =
p.apply(ContextualTextIO.read()
.from("/local/path/to/files/*.csv")
.withHasMultilineCSVRecors(true));
PCollection<Row> records3 =
p.apply(ContexturalTextIO.read()
.from("/local/path/to/files/*")
.watchForNewFiles(
Duration.standardMinutes(1),
afterTimeSinceNewOutput(
Duration.standardHours(1))));
pcoll1 = (pipeline
| 'Create' >> Create([file_name])
| 'ReadAll' >> ReadAllFromText())
pcoll2 = pipeline | 'Read' >> ReadFromText(file_name)
with beam.Pipeline() as p:
readable_files = (
p
| fileio.MatchFiles ('hdfs://path/to/*.txt')
| fileio.ReadMatches()
| beam.Reshuffle())
files_and_contens = (
readable_files
| beam.Map(lambda x: (x.metadata.path,
x.read_utf8)))
with beam.Pipeline() as p:
readable_files = (
p
| beam.io.ReadFromPubSub(...)
... #<Parse PubSub Message and Yield Filename>
)
files_and_contents = (
readable_files
| ReadAllFromText())
csv.appy(
"Write to storage",
TextIO
.write()
.to(Options
.getTextWritePrefix())
.withSuffix(".csv"));
PCollection<BankTransaction> transactions = ...;
transactions.apply(FileIO.<TransactionType,
Transaction>writeDynamic()
.by(Transaction::getTypeName)
.via(tx -> tx.getTypeName().toFields(tx),
type -> new CSVSink(type.getFieldNames()))
.to(".../path/to/")
.withNaming(type -> defaultNaming(
type + "-transactions", ".csv"));
transformed_data
| 'write' >> WriteToText(
known_args.output, coder=JsonCoder()))
(my_pcollection
| beam.io.fileio.WriteToFiles(
path='/my/file/path',
destination=lamba record: 'avro'
if record ['type'] == 'A' else 'csv',
sink = lamda dest: AvroSink()
if fest == 'avro' else CsvSink(),
file_naming = beam.io.fileio
.destination_prefix_naming()))
### BigQuery IO
#### Reading
##### Java
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> (Double)
elem.getRecord()
.get("max_temperature"))
.fromQuery(
"select max_temperature from
`clouddataflow-readonly.samples.weather_stations`")
.usingStandardSql()
.withCoder(DoubleCoder.of()));
PCollection<MyData> rows =
pipeline.apply("Read from BitQuery table",
BigQueryIO.readTableRows()
.from(
String.format("%s:%s,%s,
project, dataset, table))
.withMethod(Method.DIRECT_READ)
//.withRowRestriction
.withSelectedFields(
Arrays.asList(..."string_...","Int64...")))
.apply("TableRows to MyData",
MapElements.into(
TypeDescriptor.of(MyData.class))
.via(MyData::fromTableRow))
max_temperatures = (
p
| 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
query = 'select max_temperature from '\
`clouddataflow-readonly.samples.weather_stations`',
use_standard_sql=True)
| beam.Map(lambda elem: elem['max_temperature']))
pc.apply(BigQueryIO.<Purchase>write(tableSpec)
.useBeamSchema()
.to((ValueInSingleWindow<Purchase> purchase) -> {
return new TableDestition(
"project:dataset-" +
purchase.getValue().getUser() +
":purchases","");
});
def table_fn(element, fictional_characters):
if element in fictional_characters:
return 'my_dataset.fictional_quotes'
else:
return 'my_dataset.real_quotes'
quotes | 'WriteWithDynamicDestination' >>
beam.io.WriteToBigQuery(
table_fn,
schema=table_schema,
table_side_inputs=(fictional_characters_view, ),
...)
pipline
.apply("Read PubSub Messages",
PubsubIO
.readStrings()
.fromTopic(options.getInputTopic()))
.apply(
Window.into(
FixedWindows.of(
Duration.standardMinutes(
options.getWindowSize()))));
class GroupWindowsIntoBatches(beam.PTransform):
...
>> beam.WindowInto(
window.FixedWindows(self.window_size))
pipeline
| "Read PubSub Messages"
>> beam.io.ReadFromPubSub(topic=input_topic)
| "Window into"
>> GroupWindowsIntoBatches(window_size)
PCollectoin<KV<String, String>> records =
pipeline
.apply("Rad From Kafka",
KafkaIO.<String, String>read()
.withConsumerConfigUpdates(ImmutableMap.of(
CosumerConfig
.AUTO_OFFSET_RESET_CONFIG, "earlies"))
.withBootstrapServers(options.getBootstrapServers())
.withTopics(<...list...>)
.withKeyDeserializersAndCoder(...))
.withValueDeserializerAndCoder(...)
.withoutMetadata())
pipeline
| ReadFromKafka(
consumer_config={
'bootstrap.servers':bootstrap_servers},
topics=[topic])
p.appy("filterd read",
BittableIO.read()
.withProjectId(projectId)
.withInstanceId(instanceId)
.withTableId("table")
.withRowFilter(filter));
ByteKeyRange keyRange = ...;
p.appy("read",
BittableIO.read()
.withProjectId(projecctId)
.withInstanceId(instanceId)
.withTableId("table")
.withKeyRange(keyRange));
PCollection<KV<..., Iterable<Mutation>>> data = ...;
PCollection<BigtableWriteResult> writeResults =
data.apply("write",BittableIO.write()
.withProjectId("project")
.withInstanceId("instance")
.withTableId("table"))
.withWriteResults();
PCollection<...> moreData = ...;
moreData
.apply("wait for writes", Wait.on(writeResults))
.apply("do something", ParDo.of(...))
PCollection<AvroAutoGenClass> records =
p.apply(AvroIO.read(AvroAutoGenClass.class)
.from("gs:...*.avro"));
Schema schema = new Schema.Parser()
.parse(new File("schema.avsc"));
PCollecction<GenericRecord> records =
p.apply(AvroIO.readGenericRecords(schema)
.from("gs:...-*.avro"));
with beam.Pipeline() as p:
records = p | 'Read' >> beam.io.ReadFromAvro('/mypath/myavrofiles*')
@BoundPerElement
private static class FileToWordsFn extends DoFn<String,Integer> {
@GetInitialRestriction
public OffsetRange getInitialRestriction(
@Element String fileName) throws IOException {
return new OffsetRange(0,
new File(fileNam).length());
}
@ProcessElement
public void processElement(
@Element String fileName,
RestrictionTracker<OffsetRange, Long> tracker,
OutputReceiver<Integer> outputReceiver){...}
class FileToWordsRestrictionProvider(
beam.io.RestrictionProvider):
def initial_restriction(self, file_name):
return OffsetRange(0,os.stat(file_name).st_size)
def create_tracker(self,restriction):
return beam.io.restriction_trackers.OffsetRestrictionTracker()
clas FileToWordsFn(beam.DoFn):
def process(...)
purchases.apply(Filter.by(purchase -> {
return purchase.location.lat < 40.720 && purchase.location.lat > 40.699
&& purchase.location.lon < -73.969 && purchase.locatoin.lon > -74.747}));
purchases.apply(
Filter.whereFieldName("location.lat", (double lat) -> lat < 40.720 && lat > 40.699)
.whereFieldName("lcoation.lon", (double lon) -> lon < -73.969 && lon > -74.747));
PCollection<UserPurchases> userSums =
purchases.apply(Join.innerJoin(transactions).using("transactionId"))
.apply(Select.fieldNames("lhs.userId","rhs.totalPurchase"))
.apply(Group.byField("userId").aggregateField(Sum.ofLongs(),"totalPurchase"));
Type | Strength | Dataflow |
---|---|---|
Value | Read/write any value | yes |
Bag | Cheap append no ordering on read | yes |
Combining | Associatie/cummulative compaction | yes |
Map | Read/write just keys you specify | yes |
Set | Membership checking | no |
class StatefulBufferingFn(beam.DoFn):
MAX_BUFFER_SIZE = 500;
BUFFER_STATE = BagStateSpec('buffer', EventCoder())
COUNT_STATE = CombiningValueStateSpec('count',VarIntCoder(),combiners.SumCombineFn())
def process(self, element,
buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
count_state=beam.DoFn.StateParam(COUNT_STATE)):
buffer_state.add(element)
count_state.add(1)
count=count_state.read()
if count >= MAX_BUFFER_SIZE:
for event in buffer_state.read():
yield event
count_state.clear()
buffer_state.clear()
? What happens with the last Buffer, if it has not got enough messages to be cleared?
class StatefulBufferingFn(beam.DoFn):
...
EXPIRY_TIMER = TimerSpec('expiry', TimeDomatin.WATERMARK)
def process(self, element,
w = beam.DoFn.WindowParam,
...
expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):
expiry_timer.set(w.end + ALLOWED_LATENESS)
@on_timer(EXPIRY_TIMER)
def_expiry(self,
buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
count_state=beam.DoFn.StateParam(COUNT_STATE)):
events = buffer_state.read()
for event in events:
yield event
buffer_state.clear()
count_state.clear()
final TupleTag successTag;
final TupleTag deadLetterTag;
PCollection input = /* ... */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoDn(){
@Override
void processElement(ProcessContext ctxt) {
try {
c.output(process(c.element));
} catch(MyException ex) {
//optional Logging at debug level
c.sideOutPut(deadLetterTag, c.element);
}
}
})).writeOutPutTags(successTag, TupleTagList.of(deadLetterTag));
// Write dead letter elements to separate sink
outputTuple.get(deadLetterTag).apply(BigQuery.write(...));
//Process the successful element differently
PCollection success = outputTuple.get(successTag);
PCollection<MyUserType> = json
.apply("Parse JSON to BEAM Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convet.to(MyUserType.class))
public class External extends DoFn{
@Override
public void startBundle(){
Instantiate your external service client (Static if threadsafe)
}
@Override
public void processElement(){
Call out to external service
}
@Override
puvlic void finishBundle(){
Shutdown your external service client if needed
}}
class MyDoFn(beam.DoFn):
def setup(self):
pass
def start_bundle(self):
pass
def process(self,element):
pass
def finish_bundle(self):
pass
def teardown(self):
pass
Providing a schema enables SQL API.
Input | Input | Input |
---|---|---|
BigQuery UI | Analytical queries over historical data | Data analyst |
Dataflow SQL UI | Analytical queries over real-time data | Data analyst |
Beam SQL | Integrating SQL within a Beam pipeline | Data engineer |
Apache Calcite
ZetaSQL
$ gcloud dataflow sql query ‘select sum(foo) as baz, end_of_window from my_topic where something_is_true(bizzle) group by tumbling(timestamp, 1 hour)’
String sql1 = “select my_func(c1), c2 from pcollection”;
PCollection
select
productId,
tumble_start("INTERVAL 10 SECOND") as period_start, count(transactionId) as num_purchases
from
pubsub.topic.`instant-insights`.`retaildemo-online-purchases-json` as pr
group by
productId,
tumble(pr.event_timestamp, "INTERVAL 10 SECOND");
select
productId,
hop_start("INTERVAL 10 SECOND","INTERVAL 30 SECOND") as period_start,
hop_end("INTERVAL 10 SECOND","INTERVAL 30 SECOND") as period_end,
count(transactionId) as num_purchases
from
pubsub.topic.`instant-insights`.`retaildemo-online-purchases-json` as pr
group by
productId,
hop(pr.event_timestamp,"INTERVAL 10 SECOND","INTERVAL 30 SECOND");
select
userId,
session_start("INTERVAL 10 MINUTE") as interval_start,
session_end("INTERVAL 10 MINUTE") as interval_end,
count(transactionId) as num_transactions
from
pubsub.topic.`instant-insights`.`retaildemo-online-purchases-json` as pr
group by
userId,
session(pr.event_timestamp, "INTERVAL 10 MINUTE");
pc = p | beam.Creaate(['strawberry','raspberry','blackberry','blueberry','bananan']
pc | GroupBy(lambda name: name[0])
def my_function(df):
df['C'] = df.A + 2*df.B
result = df.groupby('C').sum().filter('A < 0')
return result
output = input | DataframeTransform(my_function)
with beam.Pipeline() as p:
pc1 = ...
pc2 = ...
df1 = to_dataframe(pc1)
df2 = to_dataframe(pc2)
...
result = ...
result_pc = to_pcollection(result)
result_pc | beam.WriteToText(...)
words = (
lines
| 'Split' >> beam.FlatMap(
lamda line: re.findall(r'[\w]+', line)).with_output_types(str)
| 'ToTows' >> beam.Map(lambda word: beam.Row(word=word)))
df = to_dataframe(words)
df['count'] = 1
counted = df.groupby['word'].sum()
counted.to_csv(known_args.output)
counted_pc = to_pcollection(counted, include_indexes=True)
_ = (
counted_pc
| beam.Filter(lambda row: row.count > 50)
| beam.Map(lambda row: f'{row.word}:{row.count}')
| beam.Map(print))
gcp > Datflow > Notebooks
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
from apache_beam.runners import DataflowRunner
options = pipeline_options.PipelineOptions()
runner = DataflowRunner()
runner.run_pipeline(p,options=options)
Filtered view is saved in URL => Copy and Paste
Filter early in the pipeline:
Read > Filter > Window > GBK
Econding and decoding cause larger overheads.
Fanout transformations: A single element outputs multiple elements. Avoid fusion, use side input.
Let Dataflow service log hotkeys with “hotKeyLoggingEnabled”
Switch to: AcroIO, TextIO + Uncompressed
JUnit 4 for unit testing
@Rule public TestPipeline p = TestPipeline.create();
@Test
public void testASingleTransform() {
// Setup your PCollection
// from an in-memory or local data source.
…
// Apply your transform.
PCollection
// Setup assertions on the pipeline.
...
p.run(); }
Do not use Anonymous DoFns. Prefer named Subclasses.
@Rule
public final transient TestPipeline p = TestPipeline.create();
@Test
@Catergory(NeedsRunner.class)
public void myPipelineTest() throws Expectation {
final PCollection<String> pcol = d.apply(...)
PAssert.that(pcol).containsInAnyOrder(...);
p.run();
}
with TestPipeline as P:
...
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
output = ...
# Check whether a PCollection
# contains some elements in any order.
assert_that(
output,
equal_to(["elem1","elem3","elem2"]))
@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
PCollection<String> input =
p.apply(Create.timestamped(
TimestampedValue.of("a", new Instant(0L)),
TimestampedValue.of("b", new Instant(0L)),
TimestampedValue.of("c", new Instant(0L)),
TimestampedValue.of("c", new instant(0L))
.plus(WINDOW_DURATION))
.withCoder(StringUtf8Coder.of()));
PCollection<KV<Sting, long>> windowedCount = input.apply(
Window.into(FixedWindows.of(WINDOW_DURATION))).apply(Count.perElement());
PAssert.that(windowedCount).containtsInAnyOrderd(
// Ouput from first window
KV.of("a", 2L),
KV.of("b", 1L),
KV.of("c", 1L),
KV.of("c", 1L));
p.run();
}
TestStream is a testing input that:
Stops producting output after all specified elements are emitted
@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
TestStream
PCollection<KV<String, Long>> windowedCount = ...
.withCoder(StringUtf8Coder.of()));
PAssert.that(windowedCount).containtsInAnyOrderd(
// Ouput from first window
KV.of("a", 2L),
KV.of("b", 1L),
KV.of("c", 1L));
p.run(); }
Test complete pipeline without sources and sinks.
Batch Pipeline: Clone production pipeline into test-pipeline.
Streaming Pipeline: Create a new subscription to the topic.
Use Beam 2.26 and higher
Save the state of streaming pipeline and launch new versions without losing state.
Create job from snapshot:
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beacm.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--stagingLocation=gs://STORAGE_BUCKET/staging/ \
--inputFile=gs://apache-beam-samples/shakespeare/* \
--output=gs://STORAGE_BUCKET/output \
--runner=DataflowRunner \
--enableStreamingEngine \
--createFromSnapshot=SNAPSHOT_ID \
--region="REGION"
Replace you existing job with a new job that runs updated pipeline code
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCound \
-Dexec.args="--project=PROJECT_ID \
--stagingLocations=gs://STORAGE_BUCKET/staging/ \
--inputFile=gs://apache-beam-samples/shakespeare/* \
--ouput=gs://STORAGE_BUCKET/ouput \
--runner=DataflowRunner \
--update \
--jobName [prior job name] \
--transformNameMapping='{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
--reagion=REGION"
python -m apache_beam.examples.wordcount \
--project $PROJECT \
--staging_location gs://$BUCKET/tmp/
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://$BUCKET/results/outputs \
--runner DataflowRunner \
--update \
--job_name [prior job name] \
--transform_name_mapping=='{oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}'
--region $REGION \
User Code & Data Shape | Outages |
---|---|
Transient errors | Service outage |
Corrupted data | Zonal outage |
Regional outage |
final TupleTag successTag;
final TupleTag deadLetterTag;
PCollection input = /* ... */
PCollection Tuple ouputTuple = input.apply(ParDo.of(new DoFn() {
@Override
void processElement(ProcessContext ctxt) {
try {
c.output(process(c.element));
} catch(MyException ex) {
// Optional Loccing at debug level
c.sideOutPut(deadLetterTag, c.element);
}
})).writeOutPutTags(successTag, TupleTagList.of(deadLetterTag));
// Write dead letter elements to separate sink
outputTuple.get(deadLetterTag).apply(BigQuery.write(...));
// Process the successful element differently.
PCollection success = outputTuple.get(successTag);
Streaming jobs try to re-run indefinetly.
gcloud pubsub snapshots create my-snapshot --subscription=my-sub
gcloud dataflow jobs drain [job-id]
gcloud pubsub subscriptions seek my-sub --snapshot=my-snapshot
gcloud dataflow jobs run my-job-name --gcs_locatoin=my_gcs_bucket
{
"name":"PubSub To Biggquery",
"description":"An Apache Beam streaming pipeline that reads JSON",
"parameters":[
{
"name":"inputSubscription",
"label":"Pub/Sub input subscription"
"helpText":"Pub7Sub subscription to read from",
"regexes":["[a-zA-Z][-_.~+%]"]
},
{
"name":"outputTable",
"label":"BigQuery outputTable",
"helpText":"Write to table",
"regexes":["^:"]
}
]
}
$ gcloud dataflow flex-template build "$TEMPLATE_SPEC_PATH" \
--image-gcr-path "$TEMPLATE_IMAGE" \
--sdk-language "JAVA" \
--flex-template-base-image JAVA8 \
--metadata-file "metadata.json" \
--jar "target/pubsub-bigquery-1.0.jar" \
--env FLEXTEMPLATE_JAVA_MAIN_CLASS="com.google.cloud.PubSuibBigquery"
$ gcloud dataflow flex-template run "job-name-`date +%Y%m%d-%H-%M-%S`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters inputSubscription="$SUBSCRIPTION" \
--parameters outputTable="$PROJECT:$DATASET.$TABLE" \
--region "$REGION"
$ curl -X POST \
"https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/${REGION}/flexTemplates:launch" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-d '{
"launch_parameter":{
"jobName":"job-name-`date +%Y%m%d-%H%M%S`",
"parameters":{
"inputSubscription":"'SUBSCRIPTION'",
"outputTable":"'$PROJECT:$DATASET.$TABLE'"
},
"containerSpecGcsPath":"'$TEMPLATE_PATH'"
}
}'
$ gcloud scheduler jobs create http scheduler-job --schedule="*/30 * * * *"
--uri="https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/${REGION}/flexTemplates:launch" --http-method=POST \
--oauth-service-account-email=email@project.iam.gserviceaccount.com \
--message-body=' {
"launch_parameter":{
"jobName":"job-name"
"parameters":{
"inputSubscription":"'$SUBSCRIPTION'",
"outputTable":"'$PROJECT:$DATASTE.$TABLE'"
},
"containerSpecGcsPath":"'$TEMPLATE_PATH'"
}
}'