невозможно создать кадр данных из файла последовательности в Spark, созданного Sqoop

Я хочу прочитать данные orders и создать из них RDD, который хранится в виде файла sequence в Hadoop fs в cloudera vm. Ниже приведены мои шаги:

1) Импорт данных заказов в виде файла последовательности:

sqoop import --connect jdbc:mysql://localhost/retail_db --username retail_dba --password cloudera  --table orders -m 1 --target-dir /ordersDataSet --as-sequencefile   

2) Чтение файла в искровой скале:

Искра 1.6

val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec => rec.toString())  

3) Когда я пытаюсь прочитать данные из RDD выше, он выдает следующую ошибку:

Caused by: java.io.IOException: WritableName can't load class: orders
    at org.apache.hadoop.io.WritableName.getClass(WritableName.java:77)
    at org.apache.hadoop.io.SequenceFile$Reader.getValueClass(SequenceFile.java:2108)
    ... 17 more
Caused by: java.lang.ClassNotFoundException: Class orders not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2185)
    at org.apache.hadoop.io.WritableName.getClass(WritableName.java:75)
    ... 18 more

Я не знаю, почему он говорит, что не может найти заказы. Где я ошибаюсь?

Я также ссылался на коды по этим двум ссылкам, но безуспешно:
1) Часть последовательности ссылок
2) Ссылка на шаг №. 8


person RushHour    schedule 10.11.2018    source источник
comment
scala› import org.apache.hadoop.io.Text scala› import org.apache.hadoop.io.IntWritable Импортируйте их и попробуйте. Основной синтаксис: scala> val result = sc.sequenceFile(/filepath/filename, classOf[Text], classOf[IntWritable]). map{case (x, y) =› (x.toString, y.get())}   -  person Kullayappa M    schedule 10.11.2018
comment
Обязательно ли использовать Sqoop? Почему бы не использовать SparkSQL JDBC?   -  person OneCricketeer    schedule 10.11.2018
comment
Как этого можно достичь?   -  person RushHour    schedule 10.11.2018


Ответы (2)


sqoop тут ни при чем, вот пример более реалистичного сценария, при котором saveAsSequenceFile всегда предполагает пары k, v — это может вам помочь:

import org.apache.hadoop.io._

val RDD = sc.parallelize( List( (1, List("A", "B")) , (2, List("B", "C")) , (3, List("C", "D", "E")) ) )
val RDD2 = RDD.map(x => (x._1, x._2.mkString("/")))
RDD2.saveAsSequenceFile("/rushhour/seq-directory/2")

val sequence_data = sc.sequenceFile("/rushhour/seq-directory/*", classOf[IntWritable], classOf[Text])
                  .map{case (x, y) => (x.get(), y.toString().split("/")(0), y.toString().split("/")(1))}

sequence_data.collect

возвращает:

res20: Array[(Int, String, String)] = Array((1,A,B), (2,B,C), (3,C,D), (1,A,B), (2,B,C), (3,C,D))

Я не уверен, хотите ли вы RDD или DF, но преобразование RDD в DF, конечно, тривиально.

person thebluephantom    schedule 10.11.2018

Я нашел решение своей проблемы. Что ж, я собираюсь написать длинное решение, но я надеюсь, что оно будет иметь смысл.

1) Когда я попытался прочитать данные, которые были импортированы в HDFS с помощью SQOOP, выдается ошибка по следующим причинам:

A) Файл последовательности - это все о key-value pair. Поэтому, когда я импортирую его с помощью sqoop, импортируемые данные не находятся в паре ключ-значение, поэтому при чтении выдается ошибка.
B) Если вы попытаетесь прочитать few characters, из которого вы можете определить требуемый two classes для передачи в качестве входных данных при чтении файла последовательности вы получите данные, как показано ниже:

[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/pa* | head -c 300
SEQ!org.apache.hadoop.io.LongWritableorders�;�M��c�K�����@���-OCLOSED@���PENDING_PAYMENT@���/COMPLETE@���"{CLOSED@���cat: Unable to write to output stream.  

Выше вы можете видеть только one class, т.е. org.apache.hadoop.io.LongWritable, и когда я передаю это при чтении данных последовательности, он выдает ошибку, упомянутую в сообщении.

val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.LongWritable]).map(rec => rec.toString())  

Я не думаю, что точка B является основной причиной этой ошибки, но я очень уверен, что точка A является настоящим виновником этой ошибки.

2) Ниже показано, как я решил свою проблему.

Я импортировал данные как файл avro data в другое место назначения, используя SQOOP. Затем я создал фрейм данных из avro, используя следующие способы:

scala> import com.databricks.spark.avro._;
scala> val avroData=sqlContext.read.avro("path")  

Теперь я создал key-value pair и сохранил его как файл sequence.

avroData.map(p=>(p(0).toString,(p(0)+"\t"+p(1)+"\t"+p(2)+"\t"+p(3)))).saveAsSequenceFile("/user/cloudera/problem5/sequence")  

Теперь, когда я пытаюсь прочитать few символов из написанного выше файла, он дает мне two classes, которые мне нужны при чтении файла, как показано ниже:

[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/part-00000 | head -c 300
SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text^#%���8P���11  1374735600000   11599   CLOSED&2#2  1374735600000   256 PENDING_PAYMENT!33  1374735600000   12111   COMPLETE44  1374735600000   8827    CLOSED!55   1374735600000   11318   COMPLETE 66 1374cat: Unable to write to output stream.  

scala> val sequenceData=sc.sequenceFile("/user/cloudera/problem5/sequence",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec=>rec.toString)
sequenceData: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at map at <console>:30

Теперь, когда я пытаюсь распечатать данные, он отображает данные, как показано ниже:

scala> sequenceData.take(4).foreach(println)
(1,1    1374735600000   11599   CLOSED)
(2,2    1374735600000   256 PENDING_PAYMENT)
(3,3    1374735600000   12111   COMPLETE)
(4,4    1374735600000   8827    CLOSED)

И последнее, но не менее важное: спасибо всем за ваши высоко оцененные усилия. Ваше здоровье!!

person RushHour    schedule 10.11.2018
comment
То же самое и с данными типа ORC. Он должен быть сохранен в формате пары ключ-значение. - person RushHour; 10.11.2018
comment
ORC можно просто сохранить через df Writer. - person thebluephantom; 11.11.2018