поиск заменить с помощью Apache spark java

Постановка проблемы:

Нам нужно заменить синонимы слов в строке на их эквивалентные слова (из большой коллекции списка синонимов ~ 40000 + пары ключевых значений) в большом наборе данных (50000 строк).

Пример:
Ввод

Allen jeevi pramod Allen Armstrong
sandesh Armstrong jeevi
harsha Nischay DeWALT

Список синонимов (пара ключ-значение) // У нас 40000 записей

Key         |   Value 
------------------------------------
Allen       |   Apex Tool Group
Armstrong   |   Columbus McKinnon
DeWALT      |   StanleyBlack

Приведенный выше список синонимов должен использоваться на входе, а выход должен быть таким, как показано в приведенном ниже формате.

Expected Output

Apex Tool Group jeevi pramod Apex Tool Group Columbus McKinnon
sandesh Columbus McKinnon jeevi
harsha Nischay StanleyBlack

Мы испробовали 3 подхода, каждый из которых имеет свои ограничения.

Подход 1

Использование UDF

    public void test () {
            List<Row> data = Arrays.asList(
            RowFactory.create(0, "Allen jeevi pramod Allen Armstrong"),
            RowFactory.create(1, "sandesh Armstrong jeevi"),
            RowFactory.create(2, "harsha Nischay DeWALT")
       );

        StructType schema = new StructType(new StructField[] {
            new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
            new StructField("sentence", DataTypes.StringType, false, Metadata.empty()) 
       });
        Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

        List<Row> data2 = Arrays.asList(
            RowFactory.create("Allen", "Apex Tool Group"),
            RowFactory.create("Armstrong","Columbus McKinnon"),
            RowFactory.create("DeWALT","StanleyBlack")
        );

        StructType schema2 = new StructType(new StructField[] {
            new StructField("label2", DataTypes.StringType, false, Metadata.empty()),
            new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) 
        });
        Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

        UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() 
        {
            private static final long serialVersionUID = -5239951370238629896L;

            @Override
            public Boolean call(String t1, String t2) throws Exception {
                return t1.contains(t2);
            }
        };
        spark.udf().register("contains", contains, DataTypes.BooleanType);

        UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, 
        String, String, String>() {
            private static final long serialVersionUID = -2882956931420910207L;

        @Override
        public String call(String t1, String t2, String t3) throws Exception {
            return t1.replaceAll(t2, t3);
        }
    };

    spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType);

Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2")))
                                       .withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2")))
                                       .select(col("sentence_replaced"));

joined.show(false);
}

`

Input
    Allen jeevi pramod Allen Armstrong
    sandesh Armstrong jeevi
    harsha Nischay DeWALT

Expected Output
    Apex Tool Group jeevi pramod Apex Tool Group Columbus McKinnon
    sandesh Columbus McKinnon jeevi
    harsha Nischay StanleyBlack

Actual Output
    Apex Tool Group jeevi pramod Apex Tool Group Armstrong
    Allen jeevi pramod Allen Columbus McKinnon
    sandesh Columbus McKinnon jeevi
    harsha Nischay StanleyBlack

Проблема с подходом 1: если во входном наборе данных есть несколько ключей-синонимов, создается много строк, как показано в приведенном выше примере выходных данных. Ожидается только одна строка со всеми заменами

Подход 2. Использование ImmutableMap с функцией замены: здесь мы сохранили пару ключей и значений в хэш-карте внутри функции ImmutableMap, мы вызвали функцию замены, чтобы заменить все элементы, но если строка содержит несколько ключей, она игнорирует всю строку без замены единственного ключа…

try {

        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder()
                .appName("JavaTokenizerExample").getOrCreate();

        HashMap<String, String> options = new HashMap<String, String>();
        options.put("header", "true");
        Dataset<Row> dataFileContent = sqlContext.load("com.databricks.spark.csv", options);
        dataFileContent=dataFileContent.withColumn("ManufacturerSource", regexp_replace(col("ManufacturerSource"),"[^a-zA-Z0-9\\s+]",""));
        dataFileContent= dataFileContent.na().replace("ManufacturerSource",ImmutableMap.<String, String>builder()
            .put("Allen", "Apex Tool Group"),
            .put("Armstrong","Columbus McKinnon"),
            .put("DeWALT","StanleyBlack")
            //Here we have 40000 entries
            .build()

          );
          dataFileContent.show(10,false);

    } catch (Exception e) {
        e.printStackTrace();
    }

Вот пример кода и результат:

Input
    Allen jeevi pramod Allen Armstrong
    sandesh Armstrong jeevi
    harsha Nischay DeWALT

Expected Output
    Apex Tool Group jeevi pramod Apex Tool Group Columbus McKinnon
    sandesh Columbus McKinnon jeevi
    harsha Nischay StanleyBlack

Actual Output
    Allen jeevi pramod Allen Armstrong
    sandesh Columbus McKinnon jeevi
    harsha Nischay StanleyBlack

Подход 3

Использование замены всего в UDF

public static void main(String[] args) {
          JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("JoinFunctions").setMaster("local[*]"));
          SQLContext sqlContext = new SQLContext(sc);
          SparkSession spark = SparkSession.builder().appName("StringSimiliarityExample").getOrCreate();


            Dataset<Row> sourceFileContent = sqlContext.read()
                        .format("com.databricks.spark.csv")
                        .option("header", "true")
                        .load("source100.csv");
            sourceFileContent.show(false);

        StructType schema = new StructType(new StructField[] {
        new StructField("label", DataTypes.IntegerType, false,
                Metadata.empty()),
        new StructField("sentence", DataTypes.StringType, false,
                Metadata.empty()) });
        Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
        UDF1 mode = new UDF1<String, String>() {
            public String call(final String types) throws Exception {
                return types.replaceAll("Allen", "Apex Tool Group")
                .replaceAll("Armstrong","Columbus McKinnon")
                .replaceAll("DeWALT","StanleyBlack")
                //40000 more entries.....
            }
        };

        sqlContext.udf().register("mode", mode, DataTypes.StringType);

        sentenceDataFrame.createOrReplaceTempView("people");
        Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence");
        newDF.show(false);
    }

Вывод исключение Stackoverflow.

Здесь мы получаем исключение stackoverflow. Потому что это похоже на рекурсивный вызов функции.

Пожалуйста, сообщите нам, есть ли другие инновационные подходы, которые могут помочь решить эту проблему.




Ответы (1)


Ни то, ни другое не сработает, так как у вас всегда есть проблема совпадений подстрок. Например:

  • ABC -> DE
  • ABCDE -> ABC

Что будет на выходе с текстом «ABCDEF HIJ KLM»? Он должен быть таким же, как вход, но ваш подход в лучшем случае выдаст «DEDEF HIJ KLM», а в худшем случае вы выполните двойную замену и получите «DEF HIJ KLM». Любой из этих случаев неверен.

Вы можете улучшить это, добавив границы к заменам, возможно, используя регулярное выражение. Однако лучшим способом было бы сначала правильно токенизировать ваш ввод, применить замену токена (которая может быть точным совпадением), а затем вернуть токенизацию в исходный формат. Это может быть так же просто, как разделение по пробелу, но вы должны четко указать, какие границы токенов могут существовать. (Остановки, дефисы и т. Д.).

person Ben Horsburgh    schedule 08.05.2017
comment
Некоторые из поднятых вами аспектов верны. вроде двойная замена, но мы продумали пару подходов. 1. Все подстроки должны оставаться наверху 2. Добавьте пробел до и после ключей. Но это основная проблема, с которой приходится сталкиваться. Проблема в том, что у нас есть большие наборы данных. - person jeevitesh; 08.05.2017
comment
Если вы токенизируете, это никогда не будет проблемой - person Ben Horsburgh; 08.05.2017
comment
Токенизация не решит проблему, основная проблема, с которой мы сталкиваемся здесь, заключается в количестве записей списка синонимов, которые следует использовать для замены, для небольшого количества списка синонимов (пара значений ключа), например 400 записей, он работает для большего набора, например 40000 выходит из строя. - person jeevitesh; 09.05.2017