本文主要研究一下flink Table的Over Windows
Table table = input
.window([OverWindow w].as("w")) // define over window with alias w
.select("a, b.sum over w, c.min over w"); // aggregate over the over window w
// Unbounded Event-time over window (assuming an event-time attribute "rowtime")
// Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
// Bounded Event-time over window (assuming an event-time attribute "rowtime")
// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
class Table(
private[flink] val tableEnv: TableEnvironment,
private[flink] val logicalPlan: LogicalNode) {
def window(overWindows: OverWindow*): OverWindowedTable = {
if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
throw new TableException("Over-windows for batch tables are currently not supported.")
if (overWindows.size != 1) {
throw new TableException("Over-Windows are currently only supported single window.")
new OverWindowedTable(this, overWindows.toArray)
* Over window is similar to the traditional OVER SQL.
case class OverWindow(
private[flink] val alias: Expression,
private[flink] val partitionBy: Seq[Expression],
private[flink] val orderBy: Expression,
private[flink] val preceding: Expression,
private[flink] val following: Expression)
object Over {
* Specifies the time attribute on which rows are grouped.
* For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.
* For batch tables, refer to a timestamp or long attribute.
def orderBy(orderBy: String): OverWindowWithOrderBy = {
val orderByExpr = ExpressionParser.parseExpression(orderBy)
new OverWindowWithOrderBy(Array[Expression](), orderByExpr)
* Partitions the elements on some partition keys.
* @param partitionBy some partition keys.
* @return A partitionedOver instance that only contains the orderBy method.
def partitionBy(partitionBy: String): PartitionedOver = {
val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray
new PartitionedOver(partitionByExpr)
class OverWindowWithOrderBy(
private val partitionByExpr: Array[Expression],
private val orderByExpr: Expression) {
* Set the preceding offset (based on time or row-count intervals) for over window.
* @param preceding preceding offset relative to the current row.
* @return this over window
def preceding(preceding: String): OverWindowWithPreceding = {
val precedingExpr = ExpressionParser.parseExpression(preceding)
new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
class PartitionedOver(private val partitionByExpr: Array[Expression]) {
* Specifies the time attribute on which rows are grouped.
* For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.
* For batch tables, refer to a timestamp or long attribute.
def orderBy(orderBy: String): OverWindowWithOrderBy = {
val orderByExpr = ExpressionParser.parseExpression(orderBy)
new OverWindowWithOrderBy(partitionByExpr, orderByExpr)
class OverWindowWithPreceding(
private val partitionBy: Seq[Expression],
private val orderBy: Expression,
private val preceding: Expression) {
private[flink] var following: Expression = _
* Assigns an alias for this window that the following `select()` clause can refer to.
* @param alias alias for this over window
* @return over window
def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
* Assigns an alias for this window that the following `select()` clause can refer to.
* @param alias alias for this over window
* @return over window
def as(alias: Expression): OverWindow = {
// set following to CURRENT_ROW / CURRENT_RANGE if not defined
if (null == following) {
if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
following = CURRENT_ROW
} else {
following = CURRENT_RANGE
OverWindow(alias, partitionBy, orderBy, preceding, following)
* Set the following offset (based on time or row-count intervals) for over window.
* @param following following offset that relative to the current row.
* @return this over window
def following(following: String): OverWindowWithPreceding = {
* Set the following offset (based on time or row-count intervals) for over window.
* @param following following offset that relative to the current row.
* @return this over window
def following(following: Expression): OverWindowWithPreceding = {
this.following = following
class OverWindowedTable(
private[flink] val table: Table,
private[flink] val overWindows: Array[OverWindow]) {
def select(fields: Expression*): Table = {
val expandedFields = expandProjectList(
throw new ValidationException(
"Window start and end properties are not available for Over windows.")
val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)
new Table(
// required for proper projection push down
explicitAlias = true)
def select(fields: String): Table = {
val fieldExprs = ExpressionParser.parseExpressionList(fields)
//get the correct expression for AggFunctionCall
val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
select(withResolvedAggFunctionCall: _*)
如有侵权,请联系 cloudcommunity@tencent.com 删除。
如有侵权,请联系 cloudcommunity@tencent.com 删除。