您现在的位置是:主页 > news > 顺的做网站便宜吗/百度精准营销获客平台

顺的做网站便宜吗/百度精准营销获客平台

admin2025/6/23 10:57:13news

简介顺的做网站便宜吗,百度精准营销获客平台,湖南建设银行宣传部网站,学做电商的步骤Evictors Flink的窗口模型允许除了WindowAssigner和Trigger之外还指定一个可选的Evictor。可以使用evictor(…)方法完成此操作。Evictors可以在触发器触发后&#xff0c;应用Window Function之前或之后从窗口中删除元素。 public interface Evictor<T, W extends Window&g…

顺的做网站便宜吗,百度精准营销获客平台,湖南建设银行宣传部网站,学做电商的步骤Evictors Flink的窗口模型允许除了WindowAssigner和Trigger之外还指定一个可选的Evictor。可以使用evictor(…)方法完成此操作。Evictors可以在触发器触发后&#xff0c;应用Window Function之前或之后从窗口中删除元素。 public interface Evictor<T, W extends Window&g…

Evictors

Flink的窗口模型允许除了WindowAssigner和Trigger之外还指定一个可选的Evictor。可以使用evictor(…)方法完成此操作。Evictors可以在触发器触发后,应用Window Function之前或之后从窗口中删除元素。

public interface Evictor<T, W extends Window> extends Serializable {/*** Optionally evicts elements. Called before windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/*** Optionally evicts elements. Called after windowing function.** @param elements The elements currently in the pane.* @param size The current number of elements in the pane.* @param window The {@link Window}* @param evictorContext The context for the Evictor*/void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);/*** A context object that is given to {@link Evictor} methods.*/interface EvictorContext {/*** Returns the current processing time.*/long getCurrentProcessingTime();/*** Returns the metric group for this {@link Evictor}. This is the same metric* group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user* function.** <p>You must not call methods that create metric objects* (such as {@link MetricGroup#counter(int)} multiple times but instead call once* and store the metric object in a field.*/MetricGroup getMetricGroup();/*** Returns the current watermark time.*/long getCurrentWatermark();}
}

evictBefore()包含要在窗口函数之前应用的剔除逻辑,而evictAfter()包含要在窗口函数之后应用的剔除逻辑。应用窗口功能之前剔除的元素将不会被其处理。

Flink DataStream Window内置了三种剔除器: CountEvictor、DeltaEvictor、TimeEvictor。

  • CountEvictor: 数量剔除器。在Window中保留指定数量的元素,并从窗口头部开始丢弃其余元素。
  • DeltaEvictor: 阈值剔除器。计算Window中最后一个元素与其余每个元素之间的增量,丢弃增量大于或等于阈值的元素。
  • TimeEvictor: 时间剔除器。保留Window中最近一段时间内的元素,并丢弃其余元素。
CountEvictor

1.窗口中数据总条数<=要保留的数据条数(maxCount),不剔除。
2.否则,从Window头部遍历并剔除。

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (size <= maxCount) { //总条数<=要保留的数据条数(maxCount),不剔除return;} else { //否则,遍历剔除int evictedCount = 0;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){iterator.next();evictedCount++;if (evictedCount > size - maxCount) {break;} else {iterator.remove();}}}
}
DeltaEvictor

1.先找到当前窗口的最后一条元素。
2.遍历窗口中的每一条元素。每条元素(A)和最后一条元素(L),依据用户提供DeltaFunction计算出一个Delta。计算出的Delta大于等于设定的阈值,则剔除该元素(A)。

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {// 找到当前窗口的最后一条元素TimestampedValue<T> lastElement = Iterables.getLast(elements);// 遍历每条元素并计算Delta。计算出的Delta大于等于设定的阈值,则剔除该条元素for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){TimestampedValue<T> element = iterator.next();if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {iterator.remove();}}
}
TimeEvictor

1.找到当前窗口时间截断点: 当前窗口最大时间点-要保留的时间段。
2.遍历窗口中的每一条元素。某条元素的时间<=截断点,则剔除该条元素。
一句话,即保留Window最近一段时间内的数据。
在这里插入图片描述

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (!hasTimestamp(elements)) {return;}// 当前Window的最大时间戳long currentTime = getMaxTimestamp(elements);// 截断点long evictCutoff = currentTime - windowSize;// 当某条元素的时间戳<=截断点,则剔除该元素for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}
}
案例
public class UserDefineEvictor implements Evictor<String, TimeWindow> {private Boolean isEvictorAfter = false;private String excludeContent = null;public UserDefineEvictor(Boolean isEvictorAfter, String excludeContent) {this.isEvictorAfter = isEvictorAfter;this.excludeContent = excludeContent;}@Overridepublic void evictBefore(Iterable<TimestampedValue<String>> elements, int size, TimeWindow window, EvictorContext evictorContext) {if(!isEvictorAfter){evict(elements,size,window,evictorContext);}}@Overridepublic void evictAfter(Iterable<TimestampedValue<String>> elements, int size, TimeWindow window, EvictorContext evictorContext) {if(isEvictorAfter){evict(elements,size,window,evictorContext);}}private void evict(Iterable<TimestampedValue<String>> elements,int size,TimeWindow window,EvictorContext evictorContext){for(Iterator<TimestampedValue<String>> iterator = elements.iterator();iterator.hasNext();){TimestampedValue<String> element = iterator.next();//将含有相关内容元素删除System.out.println(element.getValue());if(element.getValue().contains(excludeContent)){iterator.remove();}}}
}
object FlinkSlidingCountEvictor {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("train",9999)val counts = text.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2))).evictor(new UserDefineEvictor(false,"error")).apply(new UserDefineSlidingWindowFunction)counts.print()env.execute()}
}
class UserDefineSlidingWindowFunction extends AllWindowFunction[String,String,TimeWindow] {override def apply(window: TimeWindow,input: Iterable[String],out: Collector[String]): Unit = {val sdf = new SimpleDateFormat("HH:mm:ss")var start=sdf.format(window.getStart)var end=sdf.format(window.getEnd)var windowContent=input.toListprintln("window:"+start+"\t"+end+" "+windowContent.mkString(" | "))}
}