Создайте пример потоковой передачи с помощью Calcite, используя CSV

Я пытаюсь создать базовую потоковую программу с помощью Calcite, используя CSV в качестве источника данных. Я могу выполнять запросы с помощью sqlline, но я не могу сделать это программно. Мой код:

пример.json

{
  version: '1.0',
  defaultSchema: 'STREAM',
  schemas: [
    {
      name: 'SS',
      tables: [
        {
          name: 'ORDERS',
          type: 'custom',
          factory: 'org.apache.calcite.adapter.csv.CsvStreamTableFactory',
          stream: {
            stream: true
          },
          operand: {
            file: 'sales/SORDERS.csv',
            flavor: "scannable"
          }
        }
      ]
    }
  ]
}

ЗАКАЗЧИКИ.csv

PRODUCTID:int,ORDERID:int,UNITS:int
3,4,5
2,5,12
2,1,6

SimpleQuery.java

package stream_test;

import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

/**
 * Example of using Calcite via JDBC.
 *
 * <p>Schema is specified programmatically.</p>
 */
public class SimpleQuery {
  public static void main(String[] args) throws Exception {
    new SimpleQuery().run();
  }

  public void run() throws ClassNotFoundException, SQLException {
    Class.forName("org.apache.calcite.jdbc.Driver");
    Properties info = new Properties();
    info.setProperty("lex", "JAVA");
    Connection connection =
        DriverManager.getConnection("jdbc:calcite:model="
                + "/home/hduser/Downloads/calcite-master/example/csv/target/test-classes/example.json",info);
    CalciteConnection calciteConnection =
        connection.unwrap(CalciteConnection.class);
    //SchemaPlus rootSchema = calciteConnection.getRootSchema();
    //rootSchema.add("os", new ReflectiveSchema(new Os()));
    Statement statement = connection.createStatement();
    ResultSet resultSet =
            statement.executeQuery("select stream * from SS.ORDERS where SS.ORDERS.UNITS > 5");
    final StringBuilder buf = new StringBuilder();
    while (resultSet.next()) {
      int n = resultSet.getMetaData().getColumnCount();
      for (int i = 1; i <= n; i++) {
        buf.append(i > 1 ? "; " : "")
            .append(resultSet.getMetaData().getColumnLabel(i))
            .append("=")
            .append(resultSet.getObject(i));
      }
      System.out.println(buf.toString());
      buf.setLength(0);
    }
    resultSet.close();
    statement.close();
    connection.close();
  }

}

Наконец, у меня есть зависимости calcite-core 1.8.0, net.sf.opencsv 2.3, calcite-avatica 1.6.0, calcite-linq4j 1.8.0, sqlline 1.1.9, hamcrest-core 1.3, com.github.stephenc.jcip 1.0-1, commons-lang3 3.4, guava 19.0 и импортированный calcite-example-csv-1.9.0-SNAPSHOT (который я упаковал с помощью maven из версии на github).

Когда я пытаюсь запустить код, я получаю:

Exception in thread "main" java.lang.NoSuchFieldError: CANCEL_FLAG
    at org.apache.calcite.adapter.csv.CsvStreamScannableTable.scan(CsvStreamScannableTable.java:66)
    at org.apache.calcite.interpreter.TableScanNode.createScannable(TableScanNode.java:117)
    at org.apache.calcite.interpreter.TableScanNode.create(TableScanNode.java:94)
    at org.apache.calcite.interpreter.Nodes$CoreCompiler.visit(Nodes.java:68)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.calcite.util.ReflectUtil.invokeVisitorInternal(ReflectUtil.java:257)
    at org.apache.calcite.util.ReflectUtil.invokeVisitor(ReflectUtil.java:214)
    at org.apache.calcite.util.ReflectUtil$1.invokeVisitor(ReflectUtil.java:471)
    at org.apache.calcite.interpreter.Interpreter$Compiler.visit(Interpreter.java:476)
    at org.apache.calcite.interpreter.Interpreter$Compiler.visitRoot(Interpreter.java:433)
    at org.apache.calcite.interpreter.Interpreter.<init>(Interpreter.java:75)
    at Baz.bind(Unknown Source)
    at org.apache.calcite.jdbc.CalcitePrepare$CalciteSignature.enumerable(CalcitePrepare.java:327)
    at org.apache.calcite.jdbc.CalciteConnectionImpl.enumerable(CalciteConnectionImpl.java:282)
    at     package stream_test;

Любые идеи о том, как это исправить?


person grtheod    schedule 04.09.2016    source источник


Ответы (2)


У вас несоответствующие библиотеки (calcite-example-csv версии 1.9.0-SNAPSHOT по сравнению с calcite-core версии 1.8.0). См. обсуждение списка разработчиков кальцита для более подробной информации.

person Julian Hyde    schedule 05.09.2016
comment
Я использовал mvn install во всем проекте и еще раз в папке avatica. Затем я импортировал в качестве внешних банок calcite-example-csv, calcite-core, avatica, linq4j, avatica-metrics, avatica-standalone со всеми их источниками и тестами и получаю: Exception in thread main java.lang.AbstractMethodError: org .apache.calcite.config.CalciteConnectionProperty.valueClass()Ljava/lang/Class; в org.apache.calcite.avatica.ConnectionConfigImpl$PropEnv.getEnum(ConnectionConfigImpl.java:228) ... - person grtheod; 05.09.2016

В Eclipse я использовал:

В качестве зависимостей maven: commons-io 2.4, commons-logging 1.1.3, commons-lang3 3.2, janino 2.7.6, eigenbase-properties 1.1.5, avatica 1.8.0, opencsv 2.3, json-simple 1.1

и в качестве внешних банок: calcite-core 1.9.0, example-csv-1.9.0, calcite-linq4j 1.9.0 (все версии SNAPSHOT) после их создания с помощью команды mvn install из последней версии кальцита на github.

person grtheod    schedule 10.09.2016