Flink4s
Scala 3.x wrapper for Apache Flink. Scala is not a priority for the core project, and thus support for the latest versions of Scala is missing. This projects attempts to leverage the upcoming Scala-free 1.15 release to provide a viable wrapper for Scala 3.x.
Usage
Use it as you would use flink-streaming-scala
. Most methods are identical
import com.ariskk.flink4s.StreamExecutionEnvironment
final case class Counter(id: String, count: Int)
object Counter:
given typeInfo: TypeInformation[Counter] = TypeInformation.of(classOf[Counter])
val items = (1 to 1000).map(x => s"item-${x % 10}")
val stream = StreamExecutionEnvironment.fromCollection(items)
.map(x => Counter(x, 1))
.keyBy(_.id)
.reduce((acc, v) => acc.copy(counter = acc.count + v.count))
Or if you want to get fancy:
import cats.Semigroup
given semigroup: Semigroup[Counter] with
def combine(x: Counter, y: Counter) = Counter(x.id, x.value + y.value)
val stream = StreamExecutionEnvironment.fromCollection(items)
.map(x => Counter(x, 1))
.keyBy(_.id)
.combine
Caveats
There is a number of features found in flink-streaming-scala
that is (intentionally) missing:
- No automatic derivation of
TypeInformation
instances via macros. Compile times scale very poorly as codebase sizes increase. This means that some custom serializers Flink provides won't be used. For more context, check this. - No closure cleaner. It is very hard to implement and generally imperfect. If all stream processing logic lives in
object
s, then there is no need for a closure cleaner in the first place.