Detailed steps for implementing timeout status monitoring in Apache FlinkCEP

Detailed steps for implementing timeout status monitoring in Apache FlinkCEP

CEP - Complex Event Processing.

The payment has not been confirmed within a certain period of time after the order is placed.

The taxi order was generated but the passenger was not confirmed to board the taxi within a certain period of time.

The takeaway has not been confirmed to be delivered within a certain period of time beyond the scheduled delivery time.

Apache FlinkCEP API

CEPTimeoutEventJob

Brief Analysis of FlinkCEP Source Code

DataStream and PatternStream

DataStream is generally composed of events or elements of the same type. A DataStream can be converted into another DataStream through a series of transformation operations such as Filter and Map.

PatternStream is an abstraction of the CEP pattern matching stream, which combines DataStream and Pattern and provides methods such as select and flatSelect. PatternStream is not a DataStream. It provides a method to send a map consisting of a matching pattern sequence and its associated events (that is, Map<pattern name, List<event>>) to SingleOutputStreamOperator, which is a DataStream.

The methods and variables in the CEPOperatorUtils tool class are named using "PatternStream", for example:

public
 
static
 <IN, OUT> 
SingleOutputStreamOperator
<OUT> createPatternStream(...){...}
public

static
 <IN, OUT1, OUT2> 
SingleOutputStreamOperator
<OUT1> createTimeoutPatternStream(...){...}

final
 
SingleOutputStreamOperator
<OUT> patternStream;

SingleOutputStreamOperator

@Public

public
 
class
 
SingleOutputStreamOperator
<T> 
extends
 
DataStream
<T> {...}

PatternStream construction method:

PatternStream
(
final
 
DataStream
<T> inputStream, 
final
 
Pattern
<T, ?> pattern) {

  
this
.inputStream = inputStream;

  
this
.pattern = pattern;

  
this
.comparator = 
null
;

}



PatternStream
(
final
 
DataStream
<T> inputStream, 
final
 
Pattern
<T, ?> pattern, 
final
 
EventComparator
<T> comparator) {

  
this
.inputStream = inputStream;

  
this
.pattern = pattern;

  
this
.comparator = comparator;

}

Pattern, Quantifier and EventComparator

Pattern is the base class for pattern definition, Builder mode. The defined pattern will be used by NFACompiler to generate NFA.

If you want to implement methods similar to next and followedBy yourself, such as timeEnd, it should be feasible to extend and rewrite Pattern.

public
class
Pattern
<T, F 
extends
 T>
/** Mode name */
private
final
String
 name;
/** Previous pattern*/
private
final
Pattern
<T, ? 
extends
 T>previous;
/** Constraints that an event must meet to be matched by the current pattern*/
private
IterativeCondition
<F> condition;
/** Time window length, pattern matching within the time length*/
private
Time
 windowTime;
/** Pattern quantifier, meaning a pattern matches several events, etc. By default, it matches one*/
private
Quantifier
 quantifier = 
Quantifier
.one(
ConsumingStrategy
.STRICT);
/** The conditions that events must meet to stop collecting events into the loop state*/
private
IterativeCondition
<F> untilCondition;
/**
   * Applicable to {@code times} mode, used to maintain the number of times an event can occur consecutively in the mode*/
private
Times
 times;
// Skip strategy after matching the event private
final
AfterMatchSkipStrategy
 afterMatchSkipStrategy;
  ...
}

Quantifier is used to describe specific pattern behaviors, and there are three main categories:

Single-single match, Looping-loop match, Times-match within a certain number of times or a range of times.

Each Pattern can be optional (single match or loop match) and can have a ConsumingStrategy set.

Loops and times also have an additional internal ConsumingStrategy that is used between events received in the pattern.

public
class
Quantifier
 {
  ...
/**
   * 5 attributes, can be combined, but not all combinations are valid*/
public
enum
QuantifierProperty
 {
    SINGLE,
    LOOPING,
    TIMES,
    OPTIONAL,
    GREEDY
  }
/**
   * A strategy describing which events to match in this pattern */
public
enum
ConsumingStrategy
 {
    STRICT,
    SKIP_TILL_NEXT,
    SKIP_TILL_ANY,
    NOT_FOLLOW,
    NOT_NEXT
  }
/**
   * Describes the number of times an event can occur consecutively in the current pattern; for example, a pattern condition is nothing more than a boolean, and an event that satisfies the true condition occurs times consecutively, or a range of times, such as 2 to 4 times. 2 times, 3 times, and 4 times will all be matched by the current pattern, so the same event will be matched repeatedly*/
public
static
class
Times
 {
private
final
int
 from;
private
final
int
 to;
private
Times
(
int
 from, 
int
 to)
Preconditions
.checkArgument(from > 
0
, 
"The from should be a positive number greater than 0."
);
Preconditions
.checkArgument(to >= from, 
"The to should be a number greater than or equal to from: "
 + from + 
"."
);
this
.from = from;
this
.to = to;
    }
public
int
 getFrom() {
return
 from;
    }
public
int
 getTo() {
return
 to;
    }
//Number range public
static
Times
 of(
int
 from, 
int
 to)
return
new
Times
(from, to);
    }
//Specify the specific number of times public
static
Times
 of(
int
 times)
return
new
Times
(times, times);
    }
@Override
public
boolean
 equals(
Object
 o) {
if
 (
this
 == o) {
return
true
;
      }
if
 (o == 
null
 || getClass() != o.getClass()) {
return
false
;
      }
Times
 times = (
Times
) o;
return
 from == times.from &&
        to == times.to;
    }
@Override
public
int
 hashCode() {
return
Objects
.hash(from, to);
    }
  }
  ...
}

EventComparator, custom event comparator, implements the EventComparator interface.

public
 
interface
 
EventComparator
<T> 
extends
 
Comparator
<T>, 
Serializable
 {
long
 serialVersionUID = 
1L
;
}

NFACompiler and NFA

NFACompiler provides methods to compile Pattern into NFA or NFAFactory. Multiple NFAs can be created using NFAFactory.

public
class
NFACompiler
 {
  ...
/**
   * NFAFactory creates an interface for NFA*
   * @param <T> Type of the input events which are processed by the NFA
   */
public
interface
NFAFactory
<T> 
extends
Serializable
 {
    NFA<T> createNFA();
  }
  
/**
   * NFAFactory's concrete implementation NFAFactoryImpl
   *
   * <p>The implementation takes the input type serializer, the window time and the set of
   * states and their transitions to be able to create an NFA from them.
   *
   * @param <T> Type of the input events which are processed by the NFA
   */
private
static
class
NFAFactoryImpl
<T> 
implements
NFAFactory
<T> {
    
private
static
final
long
 serialVersionUID = 
8939783698296714379L
;
    
private
final
long
 windowTime;
private
final
Collection
<
State
<T>> states;
private
final
boolean
 timeoutHandling;
    
private
NFAFactoryImpl
(
long
 windowTime,
Collection
<
State
<T>> states,
boolean
 timeoutHandling) {
      
this
.windowTime = windowTime;
this
.states = states;
this
.timeoutHandling = timeoutHandling;
    }
    
@Override
public
 NFA<T> createNFA() {
// An NFA consists of a state set, the length of the time window, and whether to handle timeouts.
new
 NFA<>(states, windowTime, timeoutHandling);
    }
  }
}

NFA: Non-deterministic finite automaton - non-deterministic finite (state) automaton.

For more information, see

https://zh.wikipedia.org/wiki/Non-deterministic finite state automaton

public
class
 NFA<T> {
/**
   * The set of all valid NFA states returned by NFACompiler * These are directly derived from the user-specified pattern.
   */
private
final
Map
<
String
, 
State
<T>> states;
  
/**
   * Pattern.within(Time) specifies the time window length */
private
final
long
 windowTime;
  
/**
   * A timeout match marker */
private
final
boolean
 handleTimeout;
  ...
}

PatternSelectFunction and PatternFlatSelectFunction

When a map containing matched events is accessible via the pattern name, the select() method of PatternSelectFunction is called. The pattern name is specified when the Pattern is defined. The select() method returns exactly one result. If you need to return multiple results, you can implement PatternFlatSelectFunction.

public
 
interface
 
PatternSelectFunction
<IN, OUT> 
extends
 
Function
, 
Serializable
 {



  
/**

   * Generate a result from the given event map. These events are uniquely identified by the name of the schema they are associated with */

  OUT select(
Map
<
String
, 
List
<IN>> pattern) 
throws
 
Exception
;

}

PatternFlatSelectFunction, instead of returning an OUT, uses Collector to collect the matched events.

public
interface
PatternFlatSelectFunction
<IN, OUT> 
extends
Function
, 
Serializable
 {
  
/**
   * Generate one or more results */
void
 flatSelect(
Map
<
String
, 
List
<IN>> pattern, 
Collector
<OUT> out) 
throws
Exception
;
}

SelectTimeoutCepOperator, PatternTimeoutFunction

SelectTimeoutCepOperator is created when the createTimeoutPatternStream() method is called in CEPOperatorUtils.

The methods in SelectTimeoutCepOperator that will be called by the operator iteration are processMatchedSequences() and processTimedOutSequences().

The template method... corresponds to the processEvent() method and advanceTime() method in the abstract class AbstractKeyedCEPPatternOperator.

There is also FlatSelectTimeoutCepOperator and the corresponding PatternFlatTimeoutFunction.

public
class
SelectTimeoutCepOperator
<IN, OUT1, OUT2, KEY>
extends
AbstractKeyedCEPPatternOperator
<IN, KEY, OUT1, 
SelectTimeoutCepOperator
.
SelectWrapper
<IN, OUT1, OUT2>> {
private
OutputTag
<OUT2> timedOutOutputTag;
public
SelectTimeoutCepOperator
(
TypeSerializer
<IN> inputSerializer,
boolean
 isProcessingTime,
NFACompiler
.
NFAFactory
<IN> nfaFactory,
final
EventComparator
<IN> comparator,
AfterMatchSkipStrategy
 skipStrategy,
// Parameter naming confuses flat... including member naming in the SelectWrapper class...
PatternSelectFunction
<IN, OUT1> flatSelectFunction,
PatternTimeoutFunction
<IN, OUT2> flatTimeoutFunction,
OutputTag
<OUT2> outputTag,
OutputTag
<IN> lateDataOutputTag) {
super
(
      inputSerializer,
      isProcessingTime,
      nfaFactory,
      comparator,
      skipStrategy,
new
SelectWrapper
<>(flatSelectFunction, flatTimeoutFunction),
      lateDataOutputTag);
this
.timedOutOutputTag = outputTag;
  }
  ...
}
public
interface
PatternTimeoutFunction
<IN, OUT> 
extends
Function
, 
Serializable
 {
  OUT timeout(
Map
<
String
, 
List
<IN>> pattern, 
long
 timeoutTimestamp) 
throws
Exception
;
}
public
interface
PatternFlatTimeoutFunction
<IN, OUT> 
extends
Function
, 
Serializable
 {
void
 timeout(
Map
<
String
, 
List
<IN>> pattern, 
long
 timeoutTimestamp, 
Collector
<OUT> out) 
throws
Exception
;
}

CEP and CEPOperatorUtils

CEP is a tool class for creating PatternStream. PatternStream is just a combination of DataStream and Pattern.

public
class
 CEP {
  
public
static
 <T> 
PatternStream
<T> pattern(
DataStream
<T> input, 
Pattern
<T, ?> pattern) {
return
new
PatternStream
<>(input, pattern);
  }
  
public
static
 <T> 
PatternStream
<T> pattern(
DataStream
<T> input, 
Pattern
<T, ?> pattern, 
EventComparator
<T> comparator) {
return
new
PatternStream
<>(input, pattern, comparator);
  }
}

CEPOperatorUtils creates SingleOutputStreamOperator (DataStream) when the select() method and flatSelect() method of PatternStream are called.

public
class
CEPOperatorUtils
 {
  ...
private
static
 <IN, OUT, K> 
SingleOutputStreamOperator
<OUT> createPatternStream(
final
DataStream
<IN> inputStream,
final
Pattern
<IN, ?> pattern,
final
TypeInformation
<OUT> outTypeInfo,
final
boolean
 timeoutHandling,
final
EventComparator
<IN> comparator,
final
OperatorBuilder
<IN, OUT> operatorBuilder) {
final
TypeSerializer
<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
    
// check whether we use processing time
final
boolean
 isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic
.
ProcessingTime
;
    
// compile our pattern into a NFAFactory to instantiate NFAs later on
final
NFACompiler
.
NFAFactory
<IN> nfaFactory = 
NFACompiler
.compileFactory(pattern, timeoutHandling);
    
final
SingleOutputStreamOperator
<OUT> patternStream;
    
if
 (inputStream 
instanceof
KeyedStream
) {
KeyedStream
<IN, K> keyedStream = (
KeyedStream
<IN, K>) inputStream;
      patternStream = keyedStream.transform(
        operatorBuilder.getKeyedOperatorName(),
        outTypeInfo,
        operatorBuilder.build(
          inputSerializer,
          isProcessingTime,
          nfaFactory,
          comparator,
          pattern.getAfterMatchSkipStrategy()));
    } 
else
 {
KeySelector
<IN, 
Byte
> keySelector = 
new
NullByteKeySelector
<>();
      patternStream = inputStream.keyBy(keySelector).transform(
        operatorBuilder.getOperatorName(),
        outTypeInfo,
        operatorBuilder.build(
          inputSerializer,
          isProcessingTime,
          nfaFactory,
          comparator,
          pattern.getAfterMatchSkipStrategy()
        )).forceNonParallel();
    }
    
return
 patternStream;
  }
  ...
}

FlinkCEP Implementation Steps

  1. IN: DataSource -> DataStream -> Transformations -> DataStream
  2. Pattern: Pattern.begin.where.next.where...times...
  3. PatternStream: CEP.pattern(DataStream, Pattern)
  4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
  5. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP Matching Timeout Implementation Steps

The stream of TimeoutCEP needs keyBy, that is, KeyedStream. If the inputStream is not KeyedStream, a new 0-byte Key will be created (mentioned in the CEPOperatorUtils source code above).

KeySelector
<IN, 
Byte
> keySelector = 
new
 
NullByteKeySelector
<>();

Pattern finally calls within to set the window time. If you group by primary key, at most one timeout event will be matched in a time window, so you can use PatternStream.select(...).

  1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
  2. Pattern: Pattern.begin.where.next.where...within(Time windowTime)
  3. PatternStream: CEP.pattern(KeyedStream, Pattern)
  4. OutputTag: new OutputTag(...)
  5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
  6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
  7. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP timeout is insufficient

Similar to Flink window aggregation, if you use event time and watermarks generated by dependent events to move forward, subsequent events need to arrive before the window is triggered to calculate and output results.

FlinkCEP timeout complete demo

public
class
CEPTimeoutEventJob
 {
private
static
final
String
 LOCAL_KAFKA_BROKER = 
"localhost:9092"
;
private
static
final
String
 GROUP_ID = 
CEPTimeoutEventJob
.
class
.getSimpleName();
private
static
final
String
 GROUP_TOPIC = GROUP_ID;
  
public
static
void
 main(
String
[] args) 
throws
Exception
 {
// ParameterTool
 params = 
ParameterTool
.fromArgs(args);
    
StreamExecutionEnvironment
 env = 
StreamExecutionEnvironment
.getExecutionEnvironment();
// Use event time env.setStreamTimeCharacteristic(
TimeCharacteristic
.
EventTime
);
    env.enableCheckpointing(
5000
);
    env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(
RestartStrategies
.fixedDelayRestart(
5
, 
10000
));
    
// Don't use POJO time final
AssignerWithPeriodicWatermarks
 extractor = 
new
IngestionTimeExtractor
<POJO>();
    
// Keep consistent with Kafka Topic's Partitionenv.setParallelism(
3
);
    
Properties
 kafkaProps = 
new
Properties
();
    kafkaProps.setProperty(
"bootstrap.servers"
, LOCAL_KAFKA_BROKER);
    kafkaProps.setProperty(
"group.id"
, GROUP_ID);
    
// Access Kafka messages FlinkKafkaConsumer011
<POJO> consumer = 
new
FlinkKafkaConsumer011
<>(GROUP_TOPIC, 
new
POJOSchema
(), kafkaProps);
DataStream
<POJO> pojoDataStream = env.addSource(consumer)
        .assignTimestampsAndWatermarks(extractor);
    pojoDataStream.print();
    
// Group by primary key aid, that is, perform matching detection on each POJO event [Different types of POJOs can use different within times]
// 1.
DataStream
<POJO> keyedPojos = pojoDataStream
        .keyBy(
"aid"
);
    
// From initialization to final state - a complete POJO event sequence // 2.
Pattern
<POJO, POJO> completedPojo =
Pattern
.<POJO>begin(
"init"
)
            .where(
new
SimpleCondition
<POJO>() {
private
static
final
long
 serialVersionUID = -
6847788055093903603L
;
              
@Override
public
boolean
 filter(POJO pojo) 
throws
Exception
 {
return
"02"
.equals(pojo.getAstatus());
              }
            })
            .followedBy(
"end"
)
// .next("end")
            .where(
new
SimpleCondition
<POJO>() {
private
static
final
long
 serialVersionUID = -
2655089736460847552L
;
              
@Override
public
boolean
 filter(POJO pojo) 
throws
Exception
 {
return
"00"
.equals(pojo.getAstatus()) || 
"01"
.equals(pojo.getAstatus());
              }
            });
    
// Find the events aid that have not reached the final state within 1 minute [for testing purposes]
// If there are different within times for different types, for example, some may have a timeout of 1 minute, and some may have a timeout of 1 hour, then generate multiple PatternStreams
// 3.
PatternStream
<POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within(
Time
.minutes(
1
)));
    
// Define side output timedout
// 4.
OutputTag
<POJO> timedout = 
new
OutputTag
<POJO>(
"timedout"
) {
private
static
final
long
 serialVersionUID = 
773503794597666247L
;
    };
    
// OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction
// 5.
SingleOutputStreamOperator
<POJO> timeoutPojos = patternStream.flatSelect(
        timedout,
new
POJOTimedOut
(),
new
FlatSelectNothing
()
    );
    
//Print out the timeout POJO
// 6.7.
    timeoutPojos.getSideOutput(timedout).print();
    timeoutPojos.print();
    env.execute(
CEPTimeoutEventJob
.
class
.getSimpleName());
  }
  
/**
   * Collect the timeout events */
public
static
class
POJOTimedOut
implements
PatternFlatTimeoutFunction
<POJO, POJO> {
private
static
final
long
 serialVersionUID = -
4214641891396057732L
;
    
@Override
public
void
 timeout(
Map
<
String
, 
List
<POJO>> map, 
long
 l, 
Collector
<POJO> collector) 
throws
Exception
 {
if
 (
null
 != map.get(
"init"
)) {
for
 (POJO pojoInit: map.get(
"init"
)) {
System
.out.println(
"timeout init:"
 + pojoInit.getAid());
          collector.collect(pojoInit);
        }
      }
// Because the end timed out and the end has not been received, the end cannot be obtained here.
.out.println(
"timeout end: "
 + map.get(
"end"
));
    }
  }
  
/**
   * Usually do nothing, but you can also send all matching events downstream; if it is loose proximity, ignored or penetrated events cannot be selected and sent downstream* Complete the init and end data within one minute*
   * @param <T>
   */
public
static
class
FlatSelectNothing
<T> 
implements
PatternFlatSelectFunction
<T, T> {
private
static
final
long
 serialVersionUID = -
3029589950677623844L
;
    
@Override
public
void
 flatSelect(
Map
<
String
, 
List
<T>> pattern, 
Collector
<T> collector) {
System
.out.println(
"flatSelect: "
 + pattern);
    }
  }
}

Test results (followedBy):

3
> POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-1'
, astyle=
'STYLE000-2'
, aname=
'NAME-1'
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}], 
end
=[POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}]}
timeout init:ID000-
1
3
> POJO{aid=
'ID000-1'
, astyle=
'STYLE000-2'
, aname=
'NAME-1'
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
timeout 
end
: 
null
3
> POJO{aid=
'ID000-2'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419829639
, energy=
467.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-2'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419841394
, energy=
107.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419979567
, energy=
32.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
'ID000-3'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}], 
end
=[POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createTime=
null
, updateTime=
null
}]}
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420078008
, energy=
275.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
timeout init:ID000-
4
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
timeout 
end
: 
null

Summarize

The above are the steps for implementing timeout status monitoring in Apache FlinkCEP that I introduced to you. I hope it will be helpful to you. If you have any questions, please leave me a message and I will reply to you in time!

You may also be interested in:
  • Flink entry-level application domain name processing example
  • Analyze Flink's core principles and implement core abstractions
  • A practical tutorial on running Flink tasks in IDEA
  • How to build and test the Flink development environment in IDEA
  • Analysis of the practice of Apache Hudi combined with Flink to store billions of data into the lake

<<:  Detailed explanation of ECharts mouse event processing method

>>:  Solution to the problem of installing MySQL compressed version zip

Recommend

Example of using MySQL to count the number of different values ​​in a column

Preface The requirement implemented in this artic...

In-depth analysis of the slow query problem of MySQL Sending data

Through an example, I shared with you the solutio...

Code to display the contents of a txt book on a web page

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML ...

Solve the problems encountered when installing MySQL 8.0 on Win10 system

The problems and solutions encountered when insta...

Detailed explanation of the use of find_in_set() function in MySQL

First, let’s take an example: There is a type fie...

Vue codemirror realizes the effect of online code compiler

Preface If we want to achieve the effect of onlin...

JavaScript object-oriented class inheritance case explanation

1. Object-oriented class inheritance In the above...

Summary of common commands for building ZooKeeper3.4 middleware under centos7

1. Download and decompress 1. Introduction to Zoo...

Solution to the problem that the mysql8.0.11 client cannot log in

This article shares with you the solution to the ...

How to run Python script on Docker

First create a specific project directory for you...

Implementation of docker-compose deployment project based on MySQL8

1. First, create the corresponding folder accordi...

Vue realizes click flip effect

Use vue to simply implement a click flip effect f...

Why MySQL database avoids NULL as much as possible

Many tables in MySQL contain columns that can be ...

MySQL SHOW PROCESSLIST assists in the entire process of troubleshooting

1. SHOW PROCESSLIST command SHOW PROCESSLIST show...