использование пользовательского оператора Java в API Java в потоках инфосферы

Я некоторое время искал, как использовать настраиваемый оператор Java с API-интерфейсом Java для информационных потоков.

Что мне нужно, так это написать настроенный оператор, как показано ниже...

public class Test extends AbstractOperator {
private int i;
private int num;
@Override
public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
i = 0; ....

Я хочу использовать его, как показано ниже....

        Topology topology = new Topology("toplogy_test");
        TStream<String> inDataFileName = ...
//call the "Test" operator here

person Exorcismus    schedule 29.01.2017    source источник


Ответы (1)


Вы можете вызвать оператор Java/оператор C++ из API топологии, выполнив следующие действия:

  1. Добавьте инструментарий оператора Java:

    SPL.addToolkit(топология, новый файл("/home/streamsadmin/myTk"));

  2. Преобразование входящего потока в поток SPL:

    StreamSchema rstringSchema = Type.Factory.getStreamSchema("tuple<rstring rstring_attr_name>");
    
    SPLStream splInputStream = SPLStreams.convertStream(inDataFileName, new BiFunction<String, OutputTuple, OutputTuple>(){
    
      @Override
      public OutputTuple apply(String input_string, OutputTuple output_rstring) {
        output_rstring.setString("rstring_attr_name", input_string);
        return output_rstring;
      }}, rstringSchema);
    
  3. Вызов оператора:

    SPLStream splOutputStream = SPL.invokeOperator("OperatorNamespace::YourOperatorName", splInputStream, rstringSchema, new HashMap());

Вы можете найти больше информации об этом здесь:

http://ibmstreams.github.io/streamsx.documentation/docs/4.2/java/java-appapi-devguide/#integrating-spl-operators-with-the-java-application-api

Кстати, если вы думаете об использовании Topology API для написания топологии Streams, то проще написать обычный класс Java и вызывать его непосредственно из Topology API.

Например:

MyJavaCode someObj = new MyJavaCode();

Topology topology = new Topology("MyTopology");
TStream<String> inDataFileName = ...
inDataFileName.transform(new Function<String, String>(){
        @Override
        public String apply(String word) {
            return someObj.someFunction(word);
        }
    });

Единственное требование здесь состоит в том, что ваш класс Java должен реализовать Serializable.

person Samantha Chan    schedule 31.01.2017
comment
someObj.someFunction(word) , может ли эта функция быть вызовом веб-службы или вызовом вставки JDBC? - person Exorcismus; 01.02.2017
comment
Да, функция может делать что угодно. Я использовал этот подход и обернул библиотеку GSON для потокового анализа JSON с API топологии. Главное — убедиться, что класс является сериализуемым, а объекты, передаваемые обратно в топологический API, также сериализуемы. - person Samantha Chan; 01.02.2017