无需将数据写入磁盘然后使用它进行读取 MLUtils.readLibSVM 。原因如下。
MLUtils.readLibSVM
MLUtils.readLibSVM 期望一个文本文件,其中每一行是稀疏特征向量及其相关标签。它使用以下格式表示标签 - 特征向量对:
<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>
哪里 <feature> 是后续的索引 value 在特征向量中。 MLUtils.readLibSVM 可以读取具有此格式的文件并转换为a中的每一行 LabeledVector 实例。因此,你获得了一个 DataSet[LabeledVector] 在阅读了libSVM文件之后。这正是您需要的输入格式 SVM 和 MultipleLinearRegression 预测。
<feature>
value
LabeledVector
DataSet[LabeledVector]
SVM
MultipleLinearRegression
但是,根据您从HBase获得的数据格式,首先必须将数据转换为 libSVM 格式。除此以外, MLUtils.readLibSVM 将无法读取书面文件。如果您转换数据,那么您也可以直接将数据转换为 DataSet[LabeledVector] 并将其用作Flink ML算法的输入。这避免了不必要的磁盘周期。
libSVM
如果你从HBase获得a DataSet[String] 每个字符串都有 libSVM 格式(参见上面的规范),然后你可以申请一个 map 在HBase上运行 DataSet 具有以下地图功能。
DataSet[String]
map
DataSet
val hbaseInput: DataSet[String] = ... val labelCOODS = hbaseInput.flatMap { line => // remove all comments which start with a '#' val commentFreeLine = line.takeWhile(_ != '#').trim if(commentFreeLine.nonEmpty) { val splits = commentFreeLine.split(' ') val label = splits.head.toDouble val sparseFeatures = splits.tail val coos = sparseFeatures.map { str => val pair = str.split(':') require( pair.length == 2, "Each feature entry has to have the form <feature>:<value>") // libSVM index is 1-based, but we expect it to be 0-based val index = pair(0).toInt - 1 val value = pair(1).toDouble (index, value) } Some((label, coos)) } else { None } // Calculate maximum dimension of vectors val dimensionDS = labelCOODS.map { labelCOO => labelCOO._2.map( _._1 + 1 ).max }.reduce(scala.math.max(_, _)) val labeledVectors: DataSet[LabeledVector] = labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] { var dimension = 0 override def open(configuration: Configuration): Unit = { dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0) } override def map(value: (Double, Array[(Int, Double)])): LabeledVector = { new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2)) } }}.withBroadcastSet(dimensionDS, DIMENSION)
这会将您的libSVM格式数据转换为数据集 LabeledVectors 。
LabeledVectors