Pyspark записывает данные из блоков данных в azure sql: ValueError: некоторые типы не могут быть определены после вывода

Я пишу данные из лазурных блоков данных в лазурный sql с помощью pyspark. Код работает без нулей, но когда фрейм данных содержит нули, я получаю следующую ошибку:

databricks/spark/python/pyspark/sql/pandas/conversion.py:300: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
  Unable to convert the field Product. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Context: Unsupported type in conversion from Arrow: null
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warnings.warn(msg)

ValueError: Some of types cannot be determined after inferring

Фрейм данных должен быть записан в sql, включая нули. Как мне решить эту проблему?

sqlContext = SQLContext(sc)

def to_sql(df, table):
  finaldf = sqlContext.createDataFrame(df)
  finaldf.write.jdbc(url=url, table= table, mode ="overwrite", properties = properties)

 to_sql(data, f"TF_{table.upper()}")

РЕДАКТИРОВАТЬ:

Решил его, создав функцию, которая отображает dtypes pandas на dtypes sql и выводит столбцы и dtypes как одну строку.

def convert_dtype(df):
    df_mssql = {'int64': 'bigint', 'object': 'varchar(200)', 'float64': 'float'}
    mydict = {}
    for col in df.columns:
        if str(df.dtypes[col]) in df_mssql:
            mydict[col] = df_mssql.get(str(df.dtypes[col]))
    l = " ".join([str(k[0] + " " + k[1] + ",") for k in list(mydict.items())])
    return l[:-1]

Передача этой строки в параметр createTableColumnTypes решила этот сценарий

jdbcDF.write \
    .option("createTableColumnTypes", convert_dtype(df) \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

person Luukv93    schedule 19.11.2020    source источник


Ответы (1)


Для этого вам нужно указать схему в своем операторе записи. Вот пример из документации, ссылка на которую также приведена ниже:

jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

person AssureTech    schedule 19.11.2020
comment
Привет, спасибо, что ответили. Я написал небольшую функцию для сопоставления dtypes pandas с одной строкой, содержащей столбцы и dtypes sql. Отредактирую это в своем посте. - person Luukv93; 22.11.2020