如果你想使用 udf 在 Pipeline 你需要以下之一:
udf
Pipeline
SQLTransformer
第一个对于这样一个简单的用例非常冗长,所以我推荐第二个选项:
from pyspark.sql.functions import udf from pyspark.ml import Pipeline from pyspark.ml.feature import SQLTransformer charcount_q1 = spark.udf.register( "charcount_q1", lambda row : sum(len(char) for char in row), "integer" ) df = spark.createDataFrame( [(1, ["spark", "java", "python"])], ("id", "question1")) pipeline = Pipeline(stages = [SQLTransformer( statement = "SELECT *, charcount_q1(question1) charcountq1 FROM __THIS__" )]) pipeline.fit(df).transform(df).show() # +---+--------------------+-----------+ # | id| question1|charcountq1| # +---+--------------------+-----------+ # | 1|[spark, java, pyt...| 15| # +---+--------------------+-----------+