CEP - Complex Event Processing. The payment has not been confirmed within a certain period of time after the order is placed. The taxi order was generated but the passenger was not confirmed to board the taxi within a certain period of time. The takeaway has not been confirmed to be delivered within a certain period of time beyond the scheduled delivery time. Apache FlinkCEP API CEPTimeoutEventJob Brief Analysis of FlinkCEP Source Code DataStream and PatternStream DataStream is generally composed of events or elements of the same type. A DataStream can be converted into another DataStream through a series of transformation operations such as Filter and Map. PatternStream is an abstraction of the CEP pattern matching stream, which combines DataStream and Pattern and provides methods such as select and flatSelect. PatternStream is not a DataStream. It provides a method to send a map consisting of a matching pattern sequence and its associated events (that is, Map<pattern name, List<event>>) to SingleOutputStreamOperator, which is a DataStream. The methods and variables in the CEPOperatorUtils tool class are named using "PatternStream", for example: public static <IN, OUT> SingleOutputStreamOperator <OUT> createPatternStream(...){...} public static <IN, OUT1, OUT2> SingleOutputStreamOperator <OUT1> createTimeoutPatternStream(...){...} final SingleOutputStreamOperator <OUT> patternStream; SingleOutputStreamOperator @Public public class SingleOutputStreamOperator <T> extends DataStream <T> {...} PatternStream construction method: PatternStream ( final DataStream <T> inputStream, final Pattern <T, ?> pattern) { this .inputStream = inputStream; this .pattern = pattern; this .comparator = null ; } PatternStream ( final DataStream <T> inputStream, final Pattern <T, ?> pattern, final EventComparator <T> comparator) { this .inputStream = inputStream; this .pattern = pattern; this .comparator = comparator; } Pattern, Quantifier and EventComparator Pattern is the base class for pattern definition, Builder mode. The defined pattern will be used by NFACompiler to generate NFA. If you want to implement methods similar to next and followedBy yourself, such as timeEnd, it should be feasible to extend and rewrite Pattern. public class Pattern <T, F extends T> /** Mode name */ private final String name; /** Previous pattern*/ private final Pattern <T, ? extends T>previous; /** Constraints that an event must meet to be matched by the current pattern*/ private IterativeCondition <F> condition; /** Time window length, pattern matching within the time length*/ private Time windowTime; /** Pattern quantifier, meaning a pattern matches several events, etc. By default, it matches one*/ private Quantifier quantifier = Quantifier .one( ConsumingStrategy .STRICT); /** The conditions that events must meet to stop collecting events into the loop state*/ private IterativeCondition <F> untilCondition; /** * Applicable to {@code times} mode, used to maintain the number of times an event can occur consecutively in the mode*/ private Times times; // Skip strategy after matching the event private final AfterMatchSkipStrategy afterMatchSkipStrategy; ... } Quantifier is used to describe specific pattern behaviors, and there are three main categories: Single-single match, Looping-loop match, Times-match within a certain number of times or a range of times. Each Pattern can be optional (single match or loop match) and can have a ConsumingStrategy set. Loops and times also have an additional internal ConsumingStrategy that is used between events received in the pattern. public class Quantifier { ... /** * 5 attributes, can be combined, but not all combinations are valid*/ public enum QuantifierProperty { SINGLE, LOOPING, TIMES, OPTIONAL, GREEDY } /** * A strategy describing which events to match in this pattern */ public enum ConsumingStrategy { STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY, NOT_FOLLOW, NOT_NEXT } /** * Describes the number of times an event can occur consecutively in the current pattern; for example, a pattern condition is nothing more than a boolean, and an event that satisfies the true condition occurs times consecutively, or a range of times, such as 2 to 4 times. 2 times, 3 times, and 4 times will all be matched by the current pattern, so the same event will be matched repeatedly*/ public static class Times { private final int from; private final int to; private Times ( int from, int to) Preconditions .checkArgument(from > 0 , "The from should be a positive number greater than 0." ); Preconditions .checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + "." ); this .from = from; this .to = to; } public int getFrom() { return from; } public int getTo() { return to; } //Number range public static Times of( int from, int to) return new Times (from, to); } //Specify the specific number of times public static Times of( int times) return new Times (times, times); } @Override public boolean equals( Object o) { if ( this == o) { return true ; } if (o == null || getClass() != o.getClass()) { return false ; } Times times = ( Times ) o; return from == times.from && to == times.to; } @Override public int hashCode() { return Objects .hash(from, to); } } ... } EventComparator, custom event comparator, implements the EventComparator interface. public interface EventComparator <T> extends Comparator <T>, Serializable { long serialVersionUID = 1L ; } NFACompiler and NFA NFACompiler provides methods to compile Pattern into NFA or NFAFactory. Multiple NFAs can be created using NFAFactory. public class NFACompiler { ... /** * NFAFactory creates an interface for NFA* * @param <T> Type of the input events which are processed by the NFA */ public interface NFAFactory <T> extends Serializable { NFA<T> createNFA(); } /** * NFAFactory's concrete implementation NFAFactoryImpl * * <p>The implementation takes the input type serializer, the window time and the set of * states and their transitions to be able to create an NFA from them. * * @param <T> Type of the input events which are processed by the NFA */ private static class NFAFactoryImpl <T> implements NFAFactory <T> { private static final long serialVersionUID = 8939783698296714379L ; private final long windowTime; private final Collection < State <T>> states; private final boolean timeoutHandling; private NFAFactoryImpl ( long windowTime, Collection < State <T>> states, boolean timeoutHandling) { this .windowTime = windowTime; this .states = states; this .timeoutHandling = timeoutHandling; } @Override public NFA<T> createNFA() { // An NFA consists of a state set, the length of the time window, and whether to handle timeouts. new NFA<>(states, windowTime, timeoutHandling); } } } NFA: Non-deterministic finite automaton - non-deterministic finite (state) automaton. For more information, see https://zh.wikipedia.org/wiki/Non-deterministic finite state automaton public class NFA<T> { /** * The set of all valid NFA states returned by NFACompiler * These are directly derived from the user-specified pattern. */ private final Map < String , State <T>> states; /** * Pattern.within(Time) specifies the time window length */ private final long windowTime; /** * A timeout match marker */ private final boolean handleTimeout; ... } PatternSelectFunction and PatternFlatSelectFunction When a map containing matched events is accessible via the pattern name, the select() method of PatternSelectFunction is called. The pattern name is specified when the Pattern is defined. The select() method returns exactly one result. If you need to return multiple results, you can implement PatternFlatSelectFunction. public interface PatternSelectFunction <IN, OUT> extends Function , Serializable { /** * Generate a result from the given event map. These events are uniquely identified by the name of the schema they are associated with */ OUT select( Map < String , List <IN>> pattern) throws Exception ; } PatternFlatSelectFunction, instead of returning an OUT, uses Collector to collect the matched events. public interface PatternFlatSelectFunction <IN, OUT> extends Function , Serializable { /** * Generate one or more results */ void flatSelect( Map < String , List <IN>> pattern, Collector <OUT> out) throws Exception ; } SelectTimeoutCepOperator, PatternTimeoutFunction SelectTimeoutCepOperator is created when the createTimeoutPatternStream() method is called in CEPOperatorUtils. The methods in SelectTimeoutCepOperator that will be called by the operator iteration are processMatchedSequences() and processTimedOutSequences(). The template method... corresponds to the processEvent() method and advanceTime() method in the abstract class AbstractKeyedCEPPatternOperator. There is also FlatSelectTimeoutCepOperator and the corresponding PatternFlatTimeoutFunction. public class SelectTimeoutCepOperator <IN, OUT1, OUT2, KEY> extends AbstractKeyedCEPPatternOperator <IN, KEY, OUT1, SelectTimeoutCepOperator . SelectWrapper <IN, OUT1, OUT2>> { private OutputTag <OUT2> timedOutOutputTag; public SelectTimeoutCepOperator ( TypeSerializer <IN> inputSerializer, boolean isProcessingTime, NFACompiler . NFAFactory <IN> nfaFactory, final EventComparator <IN> comparator, AfterMatchSkipStrategy skipStrategy, // Parameter naming confuses flat... including member naming in the SelectWrapper class... PatternSelectFunction <IN, OUT1> flatSelectFunction, PatternTimeoutFunction <IN, OUT2> flatTimeoutFunction, OutputTag <OUT2> outputTag, OutputTag <IN> lateDataOutputTag) { super ( inputSerializer, isProcessingTime, nfaFactory, comparator, skipStrategy, new SelectWrapper <>(flatSelectFunction, flatTimeoutFunction), lateDataOutputTag); this .timedOutOutputTag = outputTag; } ... } public interface PatternTimeoutFunction <IN, OUT> extends Function , Serializable { OUT timeout( Map < String , List <IN>> pattern, long timeoutTimestamp) throws Exception ; } public interface PatternFlatTimeoutFunction <IN, OUT> extends Function , Serializable { void timeout( Map < String , List <IN>> pattern, long timeoutTimestamp, Collector <OUT> out) throws Exception ; } CEP and CEPOperatorUtils CEP is a tool class for creating PatternStream. PatternStream is just a combination of DataStream and Pattern. public class CEP { public static <T> PatternStream <T> pattern( DataStream <T> input, Pattern <T, ?> pattern) { return new PatternStream <>(input, pattern); } public static <T> PatternStream <T> pattern( DataStream <T> input, Pattern <T, ?> pattern, EventComparator <T> comparator) { return new PatternStream <>(input, pattern, comparator); } } CEPOperatorUtils creates SingleOutputStreamOperator (DataStream) when the select() method and flatSelect() method of PatternStream are called. public class CEPOperatorUtils { ... private static <IN, OUT, K> SingleOutputStreamOperator <OUT> createPatternStream( final DataStream <IN> inputStream, final Pattern <IN, ?> pattern, final TypeInformation <OUT> outTypeInfo, final boolean timeoutHandling, final EventComparator <IN> comparator, final OperatorBuilder <IN, OUT> operatorBuilder) { final TypeSerializer <IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); // check whether we use processing time final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic . ProcessingTime ; // compile our pattern into a NFAFactory to instantiate NFAs later on final NFACompiler . NFAFactory <IN> nfaFactory = NFACompiler .compileFactory(pattern, timeoutHandling); final SingleOutputStreamOperator <OUT> patternStream; if (inputStream instanceof KeyedStream ) { KeyedStream <IN, K> keyedStream = ( KeyedStream <IN, K>) inputStream; patternStream = keyedStream.transform( operatorBuilder.getKeyedOperatorName(), outTypeInfo, operatorBuilder.build( inputSerializer, isProcessingTime, nfaFactory, comparator, pattern.getAfterMatchSkipStrategy())); } else { KeySelector <IN, Byte > keySelector = new NullByteKeySelector <>(); patternStream = inputStream.keyBy(keySelector).transform( operatorBuilder.getOperatorName(), outTypeInfo, operatorBuilder.build( inputSerializer, isProcessingTime, nfaFactory, comparator, pattern.getAfterMatchSkipStrategy() )).forceNonParallel(); } return patternStream; } ... } FlinkCEP Implementation Steps
FlinkCEP Matching Timeout Implementation Steps The stream of TimeoutCEP needs keyBy, that is, KeyedStream. If the inputStream is not KeyedStream, a new 0-byte Key will be created (mentioned in the CEPOperatorUtils source code above). KeySelector <IN, Byte > keySelector = new NullByteKeySelector <>(); Pattern finally calls within to set the window time. If you group by primary key, at most one timeout event will be matched in a time window, so you can use PatternStream.select(...).
FlinkCEP timeout is insufficient Similar to Flink window aggregation, if you use event time and watermarks generated by dependent events to move forward, subsequent events need to arrive before the window is triggered to calculate and output results. FlinkCEP timeout complete demo public class CEPTimeoutEventJob { private static final String LOCAL_KAFKA_BROKER = "localhost:9092" ; private static final String GROUP_ID = CEPTimeoutEventJob . class .getSimpleName(); private static final String GROUP_TOPIC = GROUP_ID; public static void main( String [] args) throws Exception { // ParameterTool params = ParameterTool .fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); // Use event time env.setStreamTimeCharacteristic( TimeCharacteristic . EventTime ); env.enableCheckpointing( 5000 ); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig . ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy( RestartStrategies .fixedDelayRestart( 5 , 10000 )); // Don't use POJO time final AssignerWithPeriodicWatermarks extractor = new IngestionTimeExtractor <POJO>(); // Keep consistent with Kafka Topic's Partitionenv.setParallelism( 3 ); Properties kafkaProps = new Properties (); kafkaProps.setProperty( "bootstrap.servers" , LOCAL_KAFKA_BROKER); kafkaProps.setProperty( "group.id" , GROUP_ID); // Access Kafka messages FlinkKafkaConsumer011 <POJO> consumer = new FlinkKafkaConsumer011 <>(GROUP_TOPIC, new POJOSchema (), kafkaProps); DataStream <POJO> pojoDataStream = env.addSource(consumer) .assignTimestampsAndWatermarks(extractor); pojoDataStream.print(); // Group by primary key aid, that is, perform matching detection on each POJO event [Different types of POJOs can use different within times] // 1. DataStream <POJO> keyedPojos = pojoDataStream .keyBy( "aid" ); // From initialization to final state - a complete POJO event sequence // 2. Pattern <POJO, POJO> completedPojo = Pattern .<POJO>begin( "init" ) .where( new SimpleCondition <POJO>() { private static final long serialVersionUID = - 6847788055093903603L ; @Override public boolean filter(POJO pojo) throws Exception { return "02" .equals(pojo.getAstatus()); } }) .followedBy( "end" ) // .next("end") .where( new SimpleCondition <POJO>() { private static final long serialVersionUID = - 2655089736460847552L ; @Override public boolean filter(POJO pojo) throws Exception { return "00" .equals(pojo.getAstatus()) || "01" .equals(pojo.getAstatus()); } }); // Find the events aid that have not reached the final state within 1 minute [for testing purposes] // If there are different within times for different types, for example, some may have a timeout of 1 minute, and some may have a timeout of 1 hour, then generate multiple PatternStreams // 3. PatternStream <POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within( Time .minutes( 1 ))); // Define side output timedout // 4. OutputTag <POJO> timedout = new OutputTag <POJO>( "timedout" ) { private static final long serialVersionUID = 773503794597666247L ; }; // OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction // 5. SingleOutputStreamOperator <POJO> timeoutPojos = patternStream.flatSelect( timedout, new POJOTimedOut (), new FlatSelectNothing () ); //Print out the timeout POJO // 6.7. timeoutPojos.getSideOutput(timedout).print(); timeoutPojos.print(); env.execute( CEPTimeoutEventJob . class .getSimpleName()); } /** * Collect the timeout events */ public static class POJOTimedOut implements PatternFlatTimeoutFunction <POJO, POJO> { private static final long serialVersionUID = - 4214641891396057732L ; @Override public void timeout( Map < String , List <POJO>> map, long l, Collector <POJO> collector) throws Exception { if ( null != map.get( "init" )) { for (POJO pojoInit: map.get( "init" )) { System .out.println( "timeout init:" + pojoInit.getAid()); collector.collect(pojoInit); } } // Because the end timed out and the end has not been received, the end cannot be obtained here. .out.println( "timeout end: " + map.get( "end" )); } } /** * Usually do nothing, but you can also send all matching events downstream; if it is loose proximity, ignored or penetrated events cannot be selected and sent downstream* Complete the init and end data within one minute* * @param <T> */ public static class FlatSelectNothing <T> implements PatternFlatSelectFunction <T, T> { private static final long serialVersionUID = - 3029589950677623844L ; @Override public void flatSelect( Map < String , List <T>> pattern, Collector <T> collector) { System .out.println( "flatSelect: " + pattern); } } } Test results (followedBy): 3 > POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-1' , astyle= 'STYLE000-2' , aname= 'NAME-1' , logTime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null } flatSelect: {init=[POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null }], end =[POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null }]} timeout init:ID000- 1 3 > POJO{aid= 'ID000-1' , astyle= 'STYLE000-2' , aname= 'NAME-1' , logTime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } timeout end : null 3 > POJO{aid= 'ID000-2' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419829639 , energy= 467.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-2' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419841394 , energy= 107.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419979567 , energy= 32.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createTime= null , updateTime= null } flatSelect: {init=[POJO{aid= 'ID000-3' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null }], end =[POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createTime= null , updateTime= null }]} 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420078008 , energy= 275.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } timeout init:ID000- 4 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } timeout end : null Summarize The above are the steps for implementing timeout status monitoring in Apache FlinkCEP that I introduced to you. I hope it will be helpful to you. If you have any questions, please leave me a message and I will reply to you in time! You may also be interested in:
|
<<: Detailed explanation of ECharts mouse event processing method
>>: Solution to the problem of installing MySQL compressed version zip
Preface The requirement implemented in this artic...
Through an example, I shared with you the solutio...
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML ...
The problems and solutions encountered when insta...
First, let’s take an example: There is a type fie...
Preface If we want to achieve the effect of onlin...
1. Object-oriented class inheritance In the above...
1. Download and decompress 1. Introduction to Zoo...
This article shares with you the solution to the ...
First create a specific project directory for you...
1. First, create the corresponding folder accordi...
Use vue to simply implement a click flip effect f...
Many tables in MySQL contain columns that can be ...
Directly code: select 'bigint unsigned' a...
1. SHOW PROCESSLIST command SHOW PROCESSLIST show...