avro json дополнительное поле

У меня есть следующая схема avro

{
    "type":"record",
    "name":"test",
    "namespace":"test.name",
    "fields":[
        {"name":"items","type":
            {"type":"array",
                "items":
                    {"type":"record","name":"items",
                        "fields":[
                                {"name":"name","type":"string"},
                                {"name":"state","type":"string"}
                            ]
                    }
            }
        },
        {"name":"firstname","type":"string"}
    ]
}

когда я использую декодер Json и кодировщик avro для кодирования данных Json:

val writer = new GenericDatumWriter[GenericRecord](schema)
val reader = new GenericDatumReader[GenericRecord](schema)
val baos = new ByteArrayOutputStream
val decoder: JsonDecoder = DecoderFactory.get.jsonDecoder(schema, json)
val encoder = EncoderFactory.get.binaryEncoder(baos, null)
val datum = reader.read(null, decoder)
writer.write(datum, encoder)
encoder.flush()
val avroByteArray = baos.toByteArray

сценарий 1: когда я передаю следующий json для кодирования, он работает нормально:

{
  "items": [
    {
      "name": "dallas",
      "state": "TX"
    }
  ],
  "firstname":"arun"
}

сценарий 2: когда я передаю дополнительный атрибут в json на корневом уровне (фамилия), он может кодировать и работает нормально:

{
  "items": [
    {
      "name": "dallas",
      "state": "TX"
    }
  ],
  "firstname":"fname",
  "lastname":"lname"
}

сценарий 3: когда я добавляю дополнительный атрибут в запись массива (страна), возникает следующее исключение:

Expected record-end. Got FIELD_NAME
org.apache.avro.AvroTypeException: Expected record-end. Got FIELD_NAME
    at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:698)
{
  "items": [
    {
      "name": "dallas",
      "state": "TX",
      "country":"USA"
    }
  ],
  "firstname":"fname",
  "lastname":"lname"
}

Мне нужно, чтобы сценарий № 3 работал, любая помощь будет отличной.


person ASe    schedule 24.01.2018    source источник
comment
единственный способ решить эту проблему — создать собственный JsonDecoder, используя простой анализ Json, и для каждого поля в схеме создать GenericRecord. Я опубликую код на github.   -  person ASe    schedule 29.01.2018


Ответы (2)


Вам поможет скрыть данные json с соответствующим форматом схемы avro, используя подход фрейма данных spark.

  1. создать тип структуры из схемы avro, используя SchemaConverters
  2. создать фрейм данных из типа структуры и шага json rdd string
  3. преобразовать строки фрейма данных в json, используя df.toJSON

Примеры тестов:

import java.io.ByteArrayOutputStream

import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic._
import org.apache.avro.io._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql._
import org.apache.spark.sql.types.StructType
import org.scalatest.{Matchers, WordSpecLike}

class Test extends WordSpecLike
  with Matchers {

  val schemaString: String =
    """{
      |    "type":"record",
      |    "name":"test",
      |    "namespace":"test.name",
      |    "fields":[
      |        {"name":"items","type":
      |            {"type":"array",
      |                "items":
      |                    {"type":"record","name":"items",
      |                        "fields":[
      |                                {"name":"name","type":"string"},
      |                                {"name":"state","type":"string"}
      |                            ]
      |                    }
      |            }
      |        },
      |        {"name":"firstname","type":"string"}
      |    ]
      |}""".stripMargin

  // create spark session and sql context
  val builder: Builder = SparkSession.builder.appName("testAvroSpark")
  val sparkSession: SparkSession = builder.master("local[1]").getOrCreate()
  val sc: SparkContext = sparkSession.sparkContext
  val sqlContext: SQLContext = sparkSession.sqlContext

  // avro schema from json type schema string
  val schema: Schema = new Parser().parse(schemaString)

  // get spark struct type from avro schema
  val requiredType: StructType =
    SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]

  "scenario one json data with given schema" in {
    val scenarioOneJson: String =
      """{
        |  "items": [
        |    {
        |      "name": "dallas",
        |      "state": "TX"
        |    }
        |  ],
        |  "firstname":"rumesh"
        |}""".stripMargin

    val jsonRdd: RDD[String] = sc.parallelize(Seq(scenarioOneJson))

    val outputJsonExpected: String =
      """{"items":[{"name":"dallas","state":"TX"}],"firstname":"rumesh"}"""

    val resultJson: String = customJsonConverter(requiredType, jsonRdd).head

    assert(resultJson === outputJsonExpected)
    assert(binaryEncoder(schema, outputJsonExpected) === binaryEncoder(schema, resultJson))
  }

  "scenario two json data with given schema" in {
    val scenarioTwoJson: String =
      """{
        |  "items": [
        |    {
        |      "name": "dallas",
        |      "state": "TX"
        |    }
        |  ],
        |  "firstname":"rumesh",
        |  "lastname":"krish"
        |}""".stripMargin

    val jsonRdd: RDD[String] = sc.parallelize(Seq(scenarioTwoJson))

    val outputJsonExpected: String =
      """{"items":[{"name":"dallas","state":"TX"}],"firstname":"rumesh"}"""

    val resultJson: String = customJsonConverter(requiredType, jsonRdd).head

    assert(resultJson === outputJsonExpected)
    assert(binaryEncoder(schema, outputJsonExpected) === binaryEncoder(schema, resultJson))
  }

  "scenario three json data with given schema" in {
    val scenarioThreeJson: String =
      """{
        |  "items": [
        |    {
        |      "name": "dallas",
        |      "state": "TX",
        |      "country":"USA"
        |    }
        |  ],
        |  "firstname":"rumesh",
        |  "lastname":"krish"
        |}""".stripMargin

    val jsonRdd: RDD[String] = sc.parallelize(Seq(scenarioThreeJson))

    val outputJsonExpected: String =
      """{"items":[{"name":"dallas","state":"TX"}],"firstname":"rumesh"}"""

    val resultJson: String = customJsonConverter(requiredType, jsonRdd).head

    assert(resultJson === outputJsonExpected)
    assert(binaryEncoder(schema, outputJsonExpected) === binaryEncoder(schema, resultJson))
  }

  /**
    * convert the json using data frame json parser with given schema struct type
    *
    * @param customType   given data frame struct type
    * @param jsonInputRdd json rdd string
    * @return
    */
  private def customJsonConverter(customType: StructType,
                                  jsonInputRdd: RDD[String]): List[String] = {
    // create data frame from rdd string with struct type schema
    val df: DataFrame = sqlContext.read.schema(customType).json(jsonInputRdd)

    // get the list of json string data frame
    df.toJSON.rdd.toLocalIterator.toList
  }


  /**
    * avro binary serialization
    *
    * @param avroSchema avro schema
    * @param jsonData   json data
    * @return
    */
  private def binaryEncoder(avroSchema: Schema, jsonData: String): Array[Byte] = {
    val writer = new GenericDatumWriter[GenericRecord](avroSchema)
    val reader = new GenericDatumReader[GenericRecord](avroSchema)
    val baos = new ByteArrayOutputStream
    val decoder: JsonDecoder = DecoderFactory.get.jsonDecoder(avroSchema, jsonData)
    val encoder = EncoderFactory.get.binaryEncoder(baos, null)
    val datum = reader.read(null, decoder)
    writer.write(datum, encoder)
    encoder.flush()
    baos.toByteArray
  }

}
person Rumesh Krishnan    schedule 10.02.2018

Ваша схема не представляет структуру в сценарии 3: отсутствует поле «страна»:

{"name":"country", "type":"string"}

Вы только объявляете поля «имя» и «состояние». Затем декодер правильно ожидает, что (под)запись закончится после них, и, как указано в сообщении об ошибке, вместо этого он получает (другое) имя поля («страна»).

Кстати: вы можете использовать генератор, чтобы всегда получать соответствующую схему из вашего JSON, в сети есть пара.

person jasie    schedule 24.07.2018