Постановка проблемы:
Нам нужно заменить синонимы слов в строке на их эквивалентные слова (из большой коллекции списка синонимов ~ 40000 + пары ключевых значений) в большом наборе данных (50000 строк).
Пример:
Ввод
Allen jeevi pramod Allen Armstrong
sandesh Armstrong jeevi
harsha Nischay DeWALT
Список синонимов (пара ключ-значение) // У нас 40000 записей
Key | Value
------------------------------------
Allen | Apex Tool Group
Armstrong | Columbus McKinnon
DeWALT | StanleyBlack
Приведенный выше список синонимов должен использоваться на входе, а выход должен быть таким, как показано в приведенном ниже формате.
Expected Output
Apex Tool Group jeevi pramod Apex Tool Group Columbus McKinnon
sandesh Columbus McKinnon jeevi
harsha Nischay StanleyBlack
Мы испробовали 3 подхода, каждый из которых имеет свои ограничения.
Подход 1
Использование UDF
public void test () {
List<Row> data = Arrays.asList(
RowFactory.create(0, "Allen jeevi pramod Allen Armstrong"),
RowFactory.create(1, "sandesh Armstrong jeevi"),
RowFactory.create(2, "harsha Nischay DeWALT")
);
StructType schema = new StructType(new StructField[] {
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
List<Row> data2 = Arrays.asList(
RowFactory.create("Allen", "Apex Tool Group"),
RowFactory.create("Armstrong","Columbus McKinnon"),
RowFactory.create("DeWALT","StanleyBlack")
);
StructType schema2 = new StructType(new StructField[] {
new StructField("label2", DataTypes.StringType, false, Metadata.empty()),
new StructField("sentence2", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);
UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>()
{
private static final long serialVersionUID = -5239951370238629896L;
@Override
public Boolean call(String t1, String t2) throws Exception {
return t1.contains(t2);
}
};
spark.udf().register("contains", contains, DataTypes.BooleanType);
UDF3<String, String, String, String> replaceWithTerm = new UDF3<String,
String, String, String>() {
private static final long serialVersionUID = -2882956931420910207L;
@Override
public String call(String t1, String t2, String t3) throws Exception {
return t1.replaceAll(t2, t3);
}
};
spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType);
Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2")))
.withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2")))
.select(col("sentence_replaced"));
joined.show(false);
}
`
Input
Allen jeevi pramod Allen Armstrong
sandesh Armstrong jeevi
harsha Nischay DeWALT
Expected Output
Apex Tool Group jeevi pramod Apex Tool Group Columbus McKinnon
sandesh Columbus McKinnon jeevi
harsha Nischay StanleyBlack
Actual Output
Apex Tool Group jeevi pramod Apex Tool Group Armstrong
Allen jeevi pramod Allen Columbus McKinnon
sandesh Columbus McKinnon jeevi
harsha Nischay StanleyBlack
Проблема с подходом 1: если во входном наборе данных есть несколько ключей-синонимов, создается много строк, как показано в приведенном выше примере выходных данных. Ожидается только одна строка со всеми заменами
Подход 2. Использование ImmutableMap с функцией замены: здесь мы сохранили пару ключей и значений в хэш-карте внутри функции ImmutableMap, мы вызвали функцию замены, чтобы заменить все элементы, но если строка содержит несколько ключей, она игнорирует всю строку без замены единственного ключа…
try {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder()
.appName("JavaTokenizerExample").getOrCreate();
HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
Dataset<Row> dataFileContent = sqlContext.load("com.databricks.spark.csv", options);
dataFileContent=dataFileContent.withColumn("ManufacturerSource", regexp_replace(col("ManufacturerSource"),"[^a-zA-Z0-9\\s+]",""));
dataFileContent= dataFileContent.na().replace("ManufacturerSource",ImmutableMap.<String, String>builder()
.put("Allen", "Apex Tool Group"),
.put("Armstrong","Columbus McKinnon"),
.put("DeWALT","StanleyBlack")
//Here we have 40000 entries
.build()
);
dataFileContent.show(10,false);
} catch (Exception e) {
e.printStackTrace();
}
Вот пример кода и результат:
Input
Allen jeevi pramod Allen Armstrong
sandesh Armstrong jeevi
harsha Nischay DeWALT
Expected Output
Apex Tool Group jeevi pramod Apex Tool Group Columbus McKinnon
sandesh Columbus McKinnon jeevi
harsha Nischay StanleyBlack
Actual Output
Allen jeevi pramod Allen Armstrong
sandesh Columbus McKinnon jeevi
harsha Nischay StanleyBlack
Подход 3
Использование замены всего в UDF
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("JoinFunctions").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder().appName("StringSimiliarityExample").getOrCreate();
Dataset<Row> sourceFileContent = sqlContext.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.load("source100.csv");
sourceFileContent.show(false);
StructType schema = new StructType(new StructField[] {
new StructField("label", DataTypes.IntegerType, false,
Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false,
Metadata.empty()) });
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
UDF1 mode = new UDF1<String, String>() {
public String call(final String types) throws Exception {
return types.replaceAll("Allen", "Apex Tool Group")
.replaceAll("Armstrong","Columbus McKinnon")
.replaceAll("DeWALT","StanleyBlack")
//40000 more entries.....
}
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
sentenceDataFrame.createOrReplaceTempView("people");
Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence");
newDF.show(false);
}
Вывод исключение Stackoverflow.
Здесь мы получаем исключение stackoverflow. Потому что это похоже на рекурсивный вызов функции.
Пожалуйста, сообщите нам, есть ли другие инновационные подходы, которые могут помочь решить эту проблему.