diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala index d54fc43d3..815acdd55 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/LocalFeatureJoinJob.scala @@ -17,6 +17,9 @@ import org.apache.spark.sql.SparkSession */ object LocalFeatureJoinJob { + // This is a config for local test only to induce a FileNotFoundException. + var shouldRetryAddingSWAFeatures = false + // for user convenience, create spark session within this function, so user does not need to create one // this also ensure it has same setting as the real feathr join job val ss: SparkSession = createSparkSession(enableHiveSupport = true) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala index b37d9d62f..a2ed52069 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/DataFrameFeatureJoiner.scala @@ -327,7 +327,7 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d offline.FeatureDataFrame(obsToJoinWithFeatures, Map()) } else { val swaJoiner = new SlidingWindowAggregationJoiner(featureGroups.allWindowAggFeatures, anchorToDataSourceMapper) - swaJoiner.joinWindowAggFeaturesAsDF( + val (featureDataFrame, retryableFeatureNames) = swaJoiner.joinWindowAggFeaturesAsDF( ss, obsToJoinWithFeatures, joinConfig, @@ -338,6 +338,28 @@ private[offline] class DataFrameFeatureJoiner(logicalPlan: MultiStageJoinPlan, d swaObsTime, failOnMissingPartition, swaHandler) + + // We will retry the SWA features which could not added because of changing data. + val retryableErasedEntityTaggedFeatures = requiredWindowAggFeatures.filter(x => retryableFeatureNames.contains(x.getFeatureName)) + + // Keep only the features which are to be retried. + val updatedWindowAggFeatureStages = windowAggFeatureStages.map(x => (x._1, x._2.intersect(retryableFeatureNames))) + if (retryableFeatureNames.nonEmpty) { + swaJoiner.joinWindowAggFeaturesAsDF( + ss, + featureDataFrame.df, + joinConfig, + keyTagIntsToStrings, + updatedWindowAggFeatureStages, + retryableErasedEntityTaggedFeatures, + bloomFilters, + swaObsTime, + failOnMissingPartition, + swaHandler, + false)._1 + } else { + featureDataFrame + } } } } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala index 6f3458df7..9aac821cd 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala @@ -9,7 +9,7 @@ import com.linkedin.feathr.offline.anchored.keyExtractor.{MVELSourceKeyExtractor import com.linkedin.feathr.offline.client.DataFrameColName import com.linkedin.feathr.offline.config.FeatureJoinConfig import com.linkedin.feathr.offline.exception.FeathrIllegalStateException -import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager +import com.linkedin.feathr.offline.job.{LocalFeatureJoinJob, PreprocessedDataFrameManager} import com.linkedin.feathr.offline.join.DataFrameKeyCombiner import com.linkedin.feathr.offline.source.DataSource import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor @@ -22,11 +22,14 @@ import com.linkedin.feathr.offline.{FeatureDataFrame, JoinStage} import com.linkedin.feathr.swj.{FactData, LabelData, SlidingWindowJoin} import com.linkedin.feathr.{common, offline} import org.apache.logging.log4j.LogManager +import org.apache.spark.SparkException import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.util.sketch.BloomFilter +import java.io.FileNotFoundException import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer /** * Case class containing other SWA handler methods @@ -65,6 +68,9 @@ private[offline] class SlidingWindowAggregationJoiner( * @param obsDF Observation data * @param swaObsTimeOpt start and end time of observation data * @param failOnMissingPartition whether to fail the data loading if some of the date partitions are missing. + * @param swaHandler External SWA libraries if any should handle the SWA join + * @param shouldRetry If this is a retry attempt to retry adding features which were missed because of IOExceptions. + * Default is set to true. * @return pair of : * 1) dataframe with feature column appended to the obsData, * it can be converted to a pair RDD of (observation data record, feature record), @@ -81,7 +87,9 @@ private[offline] class SlidingWindowAggregationJoiner( bloomFilters: Option[Map[Seq[Int], BloomFilter]], swaObsTimeOpt: Option[DateTimeInterval], failOnMissingPartition: Boolean, - swaHandler: Option[SWAHandler]): FeatureDataFrame = { + swaHandler: Option[SWAHandler], + shouldRetry: Boolean = true): (FeatureDataFrame, Seq[String]) = { + val retryableSwaFeatures = ArrayBuffer.empty[String] val joinConfigSettings = joinConfig.settings // extract time window settings if (joinConfigSettings.isEmpty) { @@ -255,11 +263,23 @@ private[offline] class SlidingWindowAggregationJoiner( SlidingWindowFeatureUtils.getFactDataDef(filteredFactData, anchorWithSourceToDFMap.keySet.toSeq, featuresToDelayImmutableMap, selectedFeatures) } val origContextObsColumns = labelDataDef.dataSource.columns + val shouldRetryForMissingData = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.RETRY_ADDING_MISSING_SWA_FEATURES).toBoolean + try { + // THIS IS FOR LOCAL TEST ONLY. It is to induce a spark exception with the root cause of FileNotFoundException. + if (shouldRetry && FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES).toBoolean) + throw new SparkException("file not found", new FileNotFoundException()) + contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) + } catch { + // Many times the files which are to be loaded gets deleted midway. We will retry all the features at this stage again by reloading the datasets. + case exception: SparkException => if (shouldRetry && shouldRetryForMissingData && exception.getCause != null && exception.getCause.isInstanceOf[FileNotFoundException]) { + val unjoinedFeatures = factDataDefs.flatMap(factData => factData.aggFeatures.map(_.name)) + retryableSwaFeatures ++= unjoinedFeatures + } + } - contextDF = if (swaHandler.isDefined) swaHandler.get.join(labelDataDef, factDataDefs.toList) else SlidingWindowJoin.join(labelDataDef, factDataDefs.toList) - + val finalJoinedFeatures = joinedFeatures diff retryableSwaFeatures contextDF = if (shouldFilterNulls && !factDataRowsWithNulls.isEmpty) { - val nullDfWithFeatureCols = joinedFeatures.foldLeft(factDataRowsWithNulls)((s, x) => s.withColumn(x, lit(null))) + val nullDfWithFeatureCols = finalJoinedFeatures.foldLeft(factDataRowsWithNulls)((s, x) => s.withColumn(x, lit(null))) contextDF.union(nullDfWithFeatureCols) } else contextDF @@ -272,13 +292,13 @@ private[offline] class SlidingWindowAggregationJoiner( .asInstanceOf[TimeWindowConfigurableAnchorExtractor].features(nameToFeatureAnchor._1).columnFormat) val FeatureDataFrame(withFDSFeatureDF, inferredTypes) = - SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, joinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig) + SlidingWindowFeatureUtils.convertSWADFToFDS(contextDF, finalJoinedFeatures.toSet, featureNameToColumnFormat, userSpecifiedTypesConfig) // apply default on FDS dataset val withFeatureContextDF = - substituteDefaults(withFDSFeatureDF, defaults.keys.filter(joinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss) + substituteDefaults(withFDSFeatureDF, defaults.keys.filter(finalJoinedFeatures.contains).toSeq, defaults, userSpecifiedTypesConfig, ss) allInferredFeatureTypes ++= inferredTypes - contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, joinedFeatures, keyTags.map(keyTagList)) + contextDF = standardizeFeatureColumnNames(origContextObsColumns, withFeatureContextDF, finalJoinedFeatures, keyTags.map(keyTagList)) if (shouldCheckPoint(ss)) { // checkpoint complicated dataframe for each stage to avoid Spark failure contextDF = contextDF.checkpoint(true) @@ -292,7 +312,7 @@ private[offline] class SlidingWindowAggregationJoiner( } } }}) - offline.FeatureDataFrame(contextDF, allInferredFeatureTypes.toMap) + (offline.FeatureDataFrame(contextDF, allInferredFeatureTypes.toMap), retryableSwaFeatures) } /** diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala index e18d154b3..98ae98eda 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/FeathrUtils.scala @@ -51,6 +51,9 @@ private[feathr] object FeathrUtils { val ENABLE_SANITY_CHECK_MODE = "enable.sanity.check.mode" val SANITY_CHECK_MODE_ROW_COUNT = "sanity.check.row.count" val FILTER_NULLS = "filter.nulls" + val RETRY_ADDING_MISSING_SWA_FEATURES = "retry.adding.missing.swa.features" + // Retryer to be configured only for local tests + val LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES = "local.retry.adding.missing.swa.features" val STRING_PARAMETER_DELIMITER = "," // Used to check if the current dataframe has satisfied the checkpoint frequency @@ -86,7 +89,9 @@ private[feathr] object FeathrUtils { SPARK_JOIN_MIN_PARALLELISM -> (SQLConf.buildConf(getFullConfigKeyName(SPARK_JOIN_MIN_PARALLELISM )).stringConf.createOptional, "10"), ENABLE_SANITY_CHECK_MODE -> (SQLConf.buildConf(getFullConfigKeyName(ENABLE_SANITY_CHECK_MODE )).stringConf.createOptional, "false"), SANITY_CHECK_MODE_ROW_COUNT -> (SQLConf.buildConf(getFullConfigKeyName(SANITY_CHECK_MODE_ROW_COUNT )).stringConf.createOptional, "10"), - FILTER_NULLS -> (SQLConf.buildConf(getFullConfigKeyName(FILTER_NULLS )).stringConf.createOptional, "false") + FILTER_NULLS -> (SQLConf.buildConf(getFullConfigKeyName(FILTER_NULLS)).stringConf.createOptional, "false"), + LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES -> (SQLConf.buildConf(getFullConfigKeyName(LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES)).stringConf.createOptional, "false"), + RETRY_ADDING_MISSING_SWA_FEATURES -> (SQLConf.buildConf(getFullConfigKeyName(RETRY_ADDING_MISSING_SWA_FEATURES)).stringConf.createOptional, "true") ) /** diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 3370a6646..bf4331a23 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -2,7 +2,7 @@ package com.linkedin.feathr.offline import com.linkedin.feathr.offline.AssertFeatureUtils.{rowApproxEquals, validateRows} import com.linkedin.feathr.offline.util.FeathrUtils -import com.linkedin.feathr.offline.util.FeathrUtils.{FILTER_NULLS, SKIP_MISSING_FEATURE, setFeathrJobParam} +import com.linkedin.feathr.offline.util.FeathrUtils.{FILTER_NULLS, LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES, SKIP_MISSING_FEATURE, setFeathrJobParam} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{LongType, StructField, StructType} @@ -328,6 +328,80 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { setFeathrJobParam(FILTER_NULLS, "false") } + /** + * test SWA with dense vector feature with retry. + * This should get handled by the SWA retry method. + */ + @Test + def testLocalAnchorSWAWithDenseVectorWithRetry(): Unit = { + setFeathrJobParam(LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES, "true") + val res = runLocalFeatureJoinForTest( + """ + | settings: { + | joinTimeSettings: { + | timestampColumn: { + | def: "timestamp" + | format: "yyyy-MM-dd" + | } + | simulateTimeDelay: 1d + | } + |} + | + |features: [ + | { + | key: [mId], + | featureList: ["aEmbedding", "memberEmbeddingAutoTZ"] + | } + |] + """.stripMargin, + """ + |sources: { + | swaSource: { + | location: { path: "generation/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + |} + | + |anchors: { + | swaAnchor: { + | source: "swaSource" + | key: "x" + | features: { + | aEmbedding: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | } + | memberEmbeddingAutoTZ: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | type: { + | type: TENSOR + | tensorCategory: SPARSE + | dimensionType: [INT] + | valType: FLOAT + | } + | } + | } + | } + |} + """.stripMargin, + observationDataPath = "slidingWindowAgg/csvTypeTimeFile1.csv").data + + val featureList = res.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("mId") else "null") + + assertEquals(featureList.size, 2) + assertEquals(featureList(0).getAs[Row]("aEmbedding"), mutable.WrappedArray.make(Array(5.5f, 5.8f))) + assertEquals(featureList(0).getAs[Row]("memberEmbeddingAutoTZ"), + TestUtils.build1dSparseTensorFDSRow(Array(0, 1), Array(5.5f, 5.8f))) + setFeathrJobParam(LOCAL_RETRY_ADDING_MISSING_SWA_FEATURES, "true") + } + /** * test SWA with dense vector feature * The feature dataset generation/daily has different but compatible schema for different partitions,