Apache Flink

Flink is Streaming dataflow engine that provides data distribution, communication and fault tolerance for distributed computations over data streams.

Flink provides two types of Api for data processing. They are:

    1. DataSet API : for bounded streams

    1. DataStream API: for unbounded Streams.

  1. Features

    • Generalized distributed data processing.

    • real time processing.

    • Low latency and high throughput.

    • Fast and reliable when compared to hadoop and Spark.

    • Exposes both java and scala API’s.

    • Embraces Stream as abstraction to implement dataflow.

  1. Differences between Flink, Spark and Storm

Storm

Spark

Flink

Streaming

True

Micro-batches

True

API

Low-level

High-level

High-level

Fault-tolerance

Tuple-level ACK

RDD – based

Check point

State

Not built-in

External

Internal

Guarantee

At least once

Exactly once

Exactly once

WIndowing

Not built-in

Restricted

Flexible

Latency

Low

Medium

Low

Throughput

Medium

High

High

Use Case

  • Alerting users when a threshold is reached

  • Fraud Prevention in Finance Sector.

    • Spam Prevention.

    • Network Anomaly Detection.

  1. Flink Stack

                                                 Figure 1: Apache Flink Stack

  • Flink is a processing framework, it just cares about processing models and its exposure to different domains.

  • Flink does not have its own data storage .Flink don’t need to worry about abstractions that deal with storage because it works with any type of storage. FLink is data source agnostic. It supports multiple storage types such as kafka, local file, jdbc connection etc.

  • Flink provides various execution environments such as Local, cluster, Yarn cloud etc.

  • Flink supports both stream and batch processing. Flink has the special classes DataSet and DataStream to represent data in a program. Both of them are immutable collections of data that can contain duplicates. In the case of DataSet the data is finite and it focuses on Batch processing of data while for a DataStream the number of elements can be unbounded and it focuses on real time processing of data

4. Flink Execution Architecture

                                          Figure 2: Flink Execution Architecture

Uses Master-Slave architecture.

Job Manager

  • Acts as Master.

  • Responsible for Scheduling multiple Task managers, monitoring and deploying them.

  • It is task specific. After the task gets completed, it demolishes on its own.

Task Manager

    • Acts a slave.

    • Responsible for Task execution.

  1. Flink Setup

  1. Download a binary from the downloads page. You can pick any Hadoop/Scala combination you like. If you plan to just use the local file system, any Hadoop version will work fine.

  1. Go to the download directory.

  1. Unpack the downloaded archive.

     cd ~/Downloads# Go to download directory 

    tar xzf flink-*.tgz # Unpack the downloaded archive

    cd flink-1.5.0

  1. To start local flink cluster

 ./bin/start-cluster.sh # Start Flink

  1. Check the Dispatcher’s web frontend at http://localhost:8081 and make sure everything is up and running. The web frontend should report a single available TaskManager instance.

                                     Figure 3: Flink dashboard

  1. You can also verify that the system is running by checking the log files in the logs directory.

    tail log/flink-*-standalonesession-*.log

  1. To execute a program in flink

    $ ./bin/flink run program.jar –port port //Starting execution of program

6. Flink Programming Concepts

Flink programs are regular programs that implement transformations on distributed collections. Collections are initially created from source. Results are returned via sinks, which may for example write the data to files, or to standard output.

6.1. Data Types

  1. Java Tuples:

Tuples are composite types that contain a fixed number of fields with various types. The Java API provides classes from Tuple1 up to Tuple25. Every field of a tuple can be an arbitrary Flink type including further tuples, resulting in nested tuples.

Fields of a tuple can be accessed directly using the field’s name as tuple.f4, or using the generic getter method tuple.getField(int position). The field indices start at 0.

  1. Pojo

Java and scala classes are treated as POJO type if they follow following conditions.

    • The class must be public.

    • It must has a public constructor without arguments (default constructor).

    • All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().

    • The type of a field must be supported by Flink. At the moment, Flink uses Avro to serialize arbitrary objects (such as Date).

  1. Primitive type

Flink supports all Java and Scala primitive types such as Integer, String, and Double.

  1. Regular classes

All classes that are not identified as POJO types are handled by Flink as general class types. Flink treats these data types as black boxes and is not able to access their content. General types are de/serialized using the serialization framework Kryo.

  1. Values

Value types describe their serialization and deserialization manually. Instead of going through a general purpose serialization framework, they provide custom code for those operations by means of implementing the org.apache.flinktypes.Value interface wif the methods read and write. Using a Value type is reasonable when general purpose serialization would be highly inefficient.

  1. Hadoop Writables

You can use types that implement the org.apache.hadoop.writable interface. The serialization logic defined in the write() and readFields() methods will be used for serialization.

  1. Special types

You can use special types, including Scala’s Either, Option and Try. The Java API has its own custom implementation of Either. Similarly to Scala’s Either, it represents a value of

one two possible types, Left or Right. Either can be useful for error handling or operators dat need to output two different types of records.

6.2. Collections in Flink

These collections in Flink are immutable, meaning that once they are created you cannot add or remove elements. You can also not simply inspect the elements inside.

A collection is initially created by adding a source in a Flink program and new collections are derived from these by transforming them using API methods such as map, filter and so on.

core classes of the Java DataSet API are found in the package org.apache.flink.api.java while the classes of the Java DataStream API can be found inorg.apache.flink.streaming.api.

6.3. Anatomy of a Flink Program

1. Obtain an execution environment

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2. Create/Load the Initial data

DataStream<String> text = env.readTextFile(“file:///path/to/file”);

3. Specify Transformations on this data

We can transform input data based on our requirements by applying transformations by calling methods on DataStream with transformation functions. For example, a map transformation looks like this:

DataStream<String> input = …;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {

@Override

public Integer map(String value) {

return Integer.parseInt(value);

}});

4. Specify where to put the results of our computations

After obtaining DataStream containing final results, we can write it to an outside system by creating a sink. These are just some example methods for creating a sink

writeAsText(String path) //writes output to a file

print() // prints output on console

5. Trigger the program Execution

Once you specified the complete program you need to trigger the program execution by calling execute() on the StreamExecutionEnvironment. Depending on the type of the ExecutionEnvironment the execution will be triggered on your local machine or submit your program for execution on a cluster.

Side Output:

If addition to main output stream, we can create any number of side outputs result streams as required. These side output result streams can be of any type.

  1. Streaming(DataStream API)

The DataStream API takes streaming data as input from source, then applies transformations on input data stream, generates a resultant output stream and returns it via sink

7.1. Data Source

Flink programs read data from source. Source can be anything a file, collection, a kafka topic etc. Flink has many built in source functions , but you can always write your own custom sources if required.

Data source can be any of the following:

  1. File-based

  1. Socket-based

  1. Collection-based

  1. Custom

7.2. Data Sink

Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams.

7.3. Time

Flink supports different notations of time in streaming programs.

  1. Processing time: Processing time refers to the system time of the machine that is executing the respective operation. When a streaming program runs on processing time, all time-based operations (like time windows) will use the system clock of the machines that run the respective operator.

  2. Event time: Event time is the time that each individual event occurred on its producing device. This time is typically embedded within the records before they enter Flink, and that event timestamp can be extracted from each record.

final StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

  1. Ingestion time: Ingestion time is the time that events enter Flink. At the source operator each record gets the source’s current time as a timestamp, and time-based operations (like time windows) refer to that timestamp.

Ingestion time sits conceptually in between event time and processing time. Compared to processing time, it is slightly more expensive, but gives more predictable results. Because ingestion time uses stable timestamps (assigned once at the source), different window operations over the records will refer to the same timestamp, whereas in processing time each window operator may assign the record to a different window (based on the local system clock and any transport delay).

7.4. WaterMarks

  • Watermark is used to measure the progress of event time.

  • It flows as part of stream and carries a timestamp t such that A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp “ t’ <= t “ (i.e. events with timestamps older or equal to the watermark).

  • Once watermark reaches an operator, the operator can advance its internal event time clock to the value of timestamp.

The figure below shows a stream of events with (logical) timestamps, and watermarks flowing inline. In this example the events are in order (with respect to their timestamps), meaning that the watermarks are simply periodic markers in the stream.

7.5. Operators

Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations based on requirement.

There are different types of transformations which we can apply on input data stream. Some of them are Map, flatMap, filter, keyby etc. refer DataStream Transformations for syntax and detailed information.

Windows

  • Windows split the data stream into “buckets” of finite size, over which we can apply computations.

  • window is created as soon as the first element that should belong to dis window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness.

  • Flink guarantees removal only for time-based windows and not for other types.

  • Each window will have a trigger and a function

  • The function will contain the computation to be applied to the contents of the window.

  • Trigger specifies the conditions under which the window is considered ready for the function to be applied.

  • A trigger can also decide to purge a window’s contents any time between its creation and removal. Purging in dis case only refers to the elements in the window, and not the window metadata. This means that new data can still be added to that window.

  • Evictor is able to remove elements from the window after the trigger fires and before and/or after the function is applied.

Types of Windows:

  1. Keyed windows

  1. Non-keyed windows

windows Assigners

  1. Tumbling window: It assigns each element to a window of a specified window size. Tumbling windows has a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes.

  1. Sliding window: The sliding windows assigner assigns elements to windows of fixed length. Similar to a tumbling windows assigner, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows.

  1. Session window: The session windows assigner groups elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time, in contrast to tumbling windows and sliding windows. Instead a session window closes when it does not receive elements for a certain period of time.

  1. Global window: groups events with same key. We need to specify custom logic because global windows will not end automatically

7.6. State and Fault Tolerance

Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for any type of more elaborate operation.

There are two basic kinds of state in Flink:

  1. Keyed State: It is always relative to keys and can only be used in functions and operators on a KeyedStream.

  2. Operator State (or non-keyed state): each operator state is bound to one parallel operator instance. Teh Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.

Flink needs to be aware of the state in order to make state fault tolerant using checkpoints and to allow savepoints of streaming applications.

Checkpoints: Every function and operator in Flink can be stateful. Stateful functions store data across the processing of individual elements, making state a critical building block for any type of more elaborate operation.

In order to make state fault tolerant, Flink needs to checkpoint the state. Checkpoints allow Flink to recover state and positions in the streams to give the application the same semantics as a failure-free execution.

Savepoints: Savepoints are externally stored self-contained checkpoints that we can use to stop-and-resume or update your Flink programs. They use Flink’s checkpointing mechanism to create a (non-incremental) snapshot of the state of our streaming program and write the checkpoint data and meta data out to an external file system.

7.7. Connectors

Connectors provide code for integrating flink with various third-party system. Currently these systems are supported:

    • Apache Kafka (source/sink)

    • Apache Cassandra (sink)

    • Amazon Kinesis Streams (source/sink)

    • Elasticsearch (sink)

    • Hadoop FileSystem (sink)

    • RabbitMQ (source/sink)

    • Apache NiFi (source/sink)

    • Twitter Streaming API (source)

  1. Batch Processing(DataSet API)

The DataSet API takes dataset as input from source, then applies transformations on input datasets, generates a result and returns it via sink.

8.1. DataSet Transformation

The data provided by the external source may or may not be in required format. Data without proper format may not produce required results, so there is a need for transforming the given input to required format for further processing.

Similar to DataStream API, DataSet API also provides various functions like Map, FlatMap, reduce, aggregate etc., for transforming input DataSet to required format. Refer DataSet Transformation for various types of transformation functions and their syntax.

8.2. Data Sources

DataSet API accepts dataset in many ways such as from files, Java collections etc. Flink provides various built in methods to create datasets from given source. They are:

  1. File-based: Accepts text and csv files.

  1. Collection-based: Accepts java collections as input.

  1. Generic: Accepts file and generic input format.

8.3. Data Sinks

Data sinks consume DataSets which are obtained after transformation and stores or returns them as output. Flink comes with a variety of built-in output formats. We can print the output to console, write output to text/csv file etc.

8.4. Iteration Operations

Iteration operations are used to implement loops in flink. Flink programs implement iterative algorithms by defining a step function and embedding it into a special iteration operator.

There are two types of iterations in Flink:

  1. Bulk Iteration:This iterate is used for performing simple iterations. In each iteration the step function(step function may be a map or reduce or aggregate etc) consumes the entire input , and computes the next partial solution.

  1. Delta Iteration: It covers the case of incremental iterations. It focus only on the values that are changing with each iterations. This leads to more efficient algorithms.

8.5. Fault Tolerance

  • Batch processing DataSet API retries failed executions. We can configure how many times a failed execution is to be retried before declaring it as a failed job.

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setNumberOfExecutionRetries(3);

  • We can also define default values for the number of execution retries and the retry delay in the flink-conf.yaml

execution-retries.default: 3

  • Execution retries can be delayed if we want to retry failed executions after some time

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay

  • We can also define the default value for the retry delay in flink-conf.yaml:

    execution-retries.delay: 10 s

8.6. Zipping Elements

Zipping elements is used when we want to assign Unique identifiers to dataset. There are two ways to assign identifiers:

  1. Zip with a Dense index: ZipWithIndex assigns consecutive labels to the elements, receiving a data set as input and returning a new data set of (unique id, initial value) 2-tuples. This process requires two passes, first counting tan labeling elements, and cannot be pipelined due to the synchronization of counts.

Example:

(0,G), (1,H), (2,A), (3,B), (4,C), (5,D), (6,E), (7,F)

  1. Zip with unique identifier: In some cases we may not need to assign consecutive labels, In such case we can use ZipWithUniqueId. It works in a pipelined fashion. This method receives a data set as input and returns a new data set of (unique id, initial value) 2-tuples.

Example:

(0,G), (1,A), (2,H), (3,B), (5,C), (7,D), (9,E), (11,F)

8.7. Connectors

Flink has built in support for connecting to file systems and databases such as

  1. HDFS

  1. Amazon S3

  1. MapR FIle system

  1. Alluxio

  1. Avro

  1. Microsoft Azure table storage

  1. MongoDB

8.8. Hadoop Compatibility

Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows reusing code that was implemented for Hadoop MapReduce.

we can:

  • use Hadoop Writable data types in Flink programs.

  • use any Hadoop InputFormat as a DataSource.

  • use any Hadoop OutputFormat as a DataSink.

  • use a Hadoop Mapper as FlatMapFunction.

  • use a Hadoop Reducer as GroupReduceFunction.

  1. Complex Event Processing (CEP) with Flink

FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink. It allows us to detect event patterns in an unbounded stream of events and provides us an opportunity to select required data from source stream.

9.1. Pattern API

  • Pattern API allows us to specify the patterns that we want to detect in your stream

  • It allows us to quickly define complex event patterns.

  • Each pattern consists of multiple stages(states).

  • In order to go from one state to other, user can specify conditions.

  • These conditions can be the contiguity of events or a filter condition on an event.

  • Each pattern must have a unique name, which you use later to identify teh matched events.

  • Pattern names cannot contain the character “:”.

  • A Pattern can be either a singleton or a looping pattern. Singleton patterns accept a single event, while looping patterns can accept more than one.

  • By default, a pattern is a singleton pattern and you can transform it to a looping one by using Quantifiers. Each pattern can have one or more Conditions based on which it accepts events.

9.2. Quantifiers

We can specify looping patterns using these methods: pattern.oneOrMore() , for patterns that expect one or more occurrences of a given event and pattern.times(#ofTimes), for patterns that expect a specific number of occurrences of a given type of event.

9.3. Conditions

We can specify conditions which are to be satisfied in order to move from one state to other state in a pattern. We can specify conditions on the data stream using pattern.where(), pattern.or() or the pattern.until() method. These can be either Iterative Conditions or Simple conditions.

Iterative Conditions: This is the most general type of condition. This is how you can specify a condition That accepts subsequent events based on properties of the previously accepted events or a statistic over a subset of them.

Simple Conditions: This type of condition extends the aforementioned Iterative Conditions class and decides whether to accept an event or not, based only on properties of the event itself.

Conditions on Contiguity

FlinkCEP supports the following forms of contiguity between events:

  1. Strict Contiguity: Expects all matching events to appear strictly one after the other, without any non-matching events in-between.

  2. Relaxed Contiguity: Ignores non-matching events appearing in-between the matching ones.

  1. Non-Deterministic Relaxed Contiguity: Further relaxes contiguity, allowing additional matches that ignore some matching events.

Pattern Operation

Description

Begin

Defines a starting pattern state.

Next

Appends a new Pattern state. A matching event has to directly

succeed the previous matching event.

FollowedBy

Appends a new Pattern state. Other events can occur between a

matching event and the previous matching event.

Where(condition)

Defines a filter condition for the current pattern state. Only if an

event passes the filter, it can match the state. Multiple consecutive

where() clauses lead to their conditions being ANDed

or(condition)

Adds a new condition which is ORed with an existing one. An

event can match the pattern only if it passes at least one of the

conditions

until(condition)

Specifies a stop condition for a looping pattern. Meaning if event

matching the given condition occurs, no more events will be

accepted into the pattern.

Applicable only in conjunction with oneOrMore()

Subtype

Defines the subtype condition for the current pattern state. Only if

an event is of this subtype, it can match the state.

oneOrMore()

Specifies that this pattern expects at least one occurrence of a

matching event.

By default a relaxed internal contiguity (between subsequent

events) is used.

timesOrMore(#times)

Specifies that this pattern expects at least #times occurrences of

a matching event.

By default a relaxed internal contiguity (between subsequent

events) is used.

times(#ofTimes)

Specifies that this pattern expects an exact number of

occurrences of a matching event.

By default a relaxed internal contiguity (between subsequent

events) is used.

times(#fromTimes,

Specifies that this pattern expects occurrences between

#toTimes)

#fromTimes and #toTimes of a matching event.

By default a relaxed internal contiguity (between subsequent

events) is used.

optional()

Specifies that this pattern is optional, i.e. it may not occur at all.

This is applicable to all aforementioned quantifiers.

greedy()

Specifies that this pattern is greedy, i.e. it will repeat as many as

possible. This is only applicable to quantifiers and it does not

support group pattern currently.

Within

Defines the maximum time interval for an event sequence to

match the pattern. If a non completed event sequence exceeds

this time, it is discarded.

consecutive()

Works in conjunction with oneOrMore() and times() and imposes

strict contiguity between the matching events, i.e. any

non-matching element breaks the match (as in next()).

If not applied a relaxed contiguity is used.

allowCombinations()

Works in conjunction with oneOrMore() and times() and imposes

non-deterministic relaxed contiguity between the matching events.

If not applied a relaxed contiguity is used.

9.4. Detecting Patterns

After defining a pattern, we can apply it to input stream using patternStream. PatternStream takes input stream, pattern, comparator(optional used to sort events which are having same timestamp) and provides a stream as an output after applying pattern to the input stream.

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

9.5. Selecting from Patterns

Once you has obtained a patternStream we can select from detected event sequences using the select or flatSelect methods.

9.5.1. Select Method

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> { @Override

public OUT select(Map<String, List<IN>> pattern) {

IN startEvent = pattern.get(“start”).get(0);

IN endEvent = pattern.get(“end”).get(0);

return new OUT(startEvent, endEvent);

}

}

9.5.1. Flat Select Method

class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {

@Override

public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) { IN startEvent = pattern.get(“start”).get(0);

IN endEvent = pattern.get(“end”).get(0);

for (int me = 0; i < startEvent.getValue(); i++ ) {

collector.collect(new OUT(startEvent, endEvent));

}

}}

9.6. Handling Timed Out Partial Patterns

Whenever a pattern has a window length attached via the within keyword, it is possible that partial event sequences are discarded because they exceed the window length. To react to these timed out partial matches the select and flatSelect API calls allow you to specify a timeout handler.

9.7. Anatomy of Flink CEP program

  • Define a Data stream source DataStream<Event> input = ………

  • Create an event pattern with initial state

Pattern<Event, ?> pattern = Pattern.<Event>begin(“start”).where(condition);

  • Then create a pattern stream of events

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

  • Then create a complex event data stream from the matched pattern

DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {

@Override

public Alert select(Map<String, List<Event>> pattern) throws Exception {

return createAlert(pattern);

}

10. Sample Application using Flink DataStream and CEP using kafka

Source: https://github.com/NikhilReddyPurumandla/flinkicu