Сборщик данных StreamSets считывает неправильное время из Kafka

Похоже, что сборщик данных StreamSets считывает неверные значения даты и времени.

Я попытался прочитать простую тему из Confluent: когда я проверяю значение даты и времени в миллисекундах с темами Landoop Kafka, он показывает правильное время и дату, но когда я читаю его с помощью Kafka Consumer 0.10.0.0 в StreamSets, он дает мне дату и время на 3 часа и 20 минут меньше. . Мой фактический часовой пояс GMT+03:00.

Мои настройки SDC имеют правильный часовой пояс, серверная ОС также имеет правильный часовой пояс, даже time:now() в функции SDC дает правильный результат!

Есть предположения, почему?

ОБНОВЛЕНИЕ - экспортировал мой рабочий процесс

    {
  "pipelineConfig" : {
    "schemaVersion" : 4,
    "version" : 7,
    "pipelineId" : "connectorCHPGPLP2S20002datafile2d0f60f29-8175-4714-a60f-aa566e726ecb",
    "title" : "connector_test_data_file2",
    "description" : "",
    "uuid" : "13fea9f9-5253-432b-8bf4-09dfbe763dcd",
    "configuration" : [ {
      "name" : "executionMode",
      "value" : "STANDALONE"
    }, {
      "name" : "deliveryGuarantee",
      "value" : "AT_LEAST_ONCE"
    }, {
      "name" : "startEventStage",
      "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1"
    }, {
      "name" : "stopEventStage",
      "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1"
    }, {
      "name" : "shouldRetry",
      "value" : true
    }, {
      "name" : "retryAttempts",
      "value" : -1
    }, {
      "name" : "memoryLimit",
      "value" : "${jvm:maxMemoryMB() * 0.65}"
    }, {
      "name" : "memoryLimitExceeded",
      "value" : "STOP_PIPELINE"
    }, {
      "name" : "notifyOnStates",
      "value" : [ "RUN_ERROR", "STOPPED", "FINISHED" ]
    }, {
      "name" : "emailIDs",
      "value" : [ ]
    }, {
      "name" : "constants",
      "value" : [ ]
    }, {
      "name" : "badRecordsHandling",
      "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_recordstolocalfilesystem_ToErrorLocalFSDTarget::1"
    }, {
      "name" : "errorRecordPolicy",
      "value" : "ORIGINAL_RECORD"
    }, {
      "name" : "workerCount",
      "value" : 0
    }, {
      "name" : "clusterSlaveMemory",
      "value" : 1024
    }, {
      "name" : "clusterSlaveJavaOpts",
      "value" : "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Dlog4j.debug"
    }, {
      "name" : "clusterLauncherEnv",
      "value" : [ ]
    }, {
      "name" : "mesosDispatcherURL",
      "value" : null
    }, {
      "name" : "hdfsS3ConfDir",
      "value" : null
    }, {
      "name" : "rateLimit",
      "value" : 0
    }, {
      "name" : "maxRunners",
      "value" : 0
    }, {
      "name" : "webhookConfigs",
      "value" : [ ]
    }, {
      "name" : "sparkConfigs",
      "value" : [ ]
    }, {
      "name" : "statsAggregatorStage",
      "value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget::1"
    } ],
    "uiInfo" : {
      "previewConfig" : {
        "previewSource" : "CONFIGURED_SOURCE",
        "batchSize" : "1000",
        "timeout" : 10000,
        "writeToDestinations" : false,
        "executeLifecycleEvents" : false,
        "showHeader" : false,
        "showFieldType" : true,
        "rememberMe" : false
      }
    },
    "stages" : [ {
      "instanceName" : "KafkaConsumer_01",
      "library" : "streamsets-datacollector-apache-kafka_0_10-lib",
      "stageName" : "com_streamsets_pipeline_stage_origin_kafka_KafkaDSource",
      "stageVersion" : "5",
      "configuration" : [ {
        "name" : "kafkaConfigBean.dataFormatConfig.filePatternInArchive",
        "value" : "*"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.charset",
        "value" : "UTF-8"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.removeCtrlChars",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.textMaxLineLen",
        "value" : 1024
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.useCustomDelimiter",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.customDelimiter",
        "value" : "\\r\\n"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.includeCustomDelimiterInTheText",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.jsonContent",
        "value" : "MULTIPLE_OBJECTS"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.jsonMaxObjectLen",
        "value" : 4096
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvFileFormat",
        "value" : "CSV"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvHeader",
        "value" : "NO_HEADER"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvMaxObjectLen",
        "value" : 1024
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvAllowExtraColumns",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvExtraColumnPrefix",
        "value" : "_extra_"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvCustomDelimiter",
        "value" : "|"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvCustomEscape",
        "value" : "\\"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvCustomQuote",
        "value" : "\""
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvEnableComments",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvCommentMarker",
        "value" : "#"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvIgnoreEmptyLines",
        "value" : true
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvRecordType",
        "value" : "LIST_MAP"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.csvSkipStartLines",
        "value" : 0
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.parseNull",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.nullConstant",
        "value" : "\\\\N"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.xmlRecordElement",
        "value" : null
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.includeFieldXpathAttributes",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.xPathNamespaceContext",
        "value" : [ ]
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.outputFieldAttributes",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.xmlMaxObjectLen",
        "value" : 4096
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.logMode",
        "value" : "COMMON_LOG_FORMAT"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.logMaxObjectLen",
        "value" : 1024
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.retainOriginalLine",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.customLogFormat",
        "value" : "%h %l %u %t \"%r\" %>s %b"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.regex",
        "value" : "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.fieldPathsToGroupName",
        "value" : [ {
          "fieldPath" : "/",
          "group" : 1
        } ]
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.grokPatternDefinition",
        "value" : null
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.grokPattern",
        "value" : "%{COMMONAPACHELOG}"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.onParseError",
        "value" : "ERROR"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.maxStackTraceLines",
        "value" : 50
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.enableLog4jCustomLogFormat",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.log4jCustomLogFormat",
        "value" : "%r [%t] %-5p %c %x - %m%n"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.avroSchemaSource",
        "value" : "REGISTRY"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.avroSchema",
        "value" : null
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.schemaRegistryUrls",
        "value" : [ "http://test-kafka01.mytest.com:8081" ]
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.schemaLookupMode",
        "value" : "AUTO"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.subject",
        "value" : null
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.schemaId",
        "value" : null
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.protoDescriptorFile",
        "value" : null
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.messageType",
        "value" : null
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.isDelimited",
        "value" : true
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.binaryMaxObjectLen",
        "value" : 1024
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.datagramMode",
        "value" : "SYSLOG"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.typesDbPath",
        "value" : null
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.convertTime",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.excludeInterval",
        "value" : true
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.authFilePath",
        "value" : null
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.wholeFileMaxObjectLen",
        "value" : 8192
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.rateLimit",
        "value" : "-1"
      }, {
        "name" : "kafkaConfigBean.dataFormatConfig.verifyChecksum",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.dataFormat",
        "value" : "AVRO"
      }, {
        "name" : "kafkaConfigBean.metadataBrokerList",
        "value" : "test-kafka01:9092,test-kafka02:9092,test-kafka03:9092"
      }, {
        "name" : "kafkaConfigBean.zookeeperConnect",
        "value" : "test-kafka01:2181,test-kafka02:2181,test-kafka03:2181"
      }, {
        "name" : "kafkaConfigBean.consumerGroup",
        "value" : "connector_elastic"
      }, {
        "name" : "kafkaConfigBean.topic",
        "value" : "test_data"
      }, {
        "name" : "kafkaConfigBean.produceSingleRecordPerMessage",
        "value" : false
      }, {
        "name" : "kafkaConfigBean.maxBatchSize",
        "value" : 1000
      }, {
        "name" : "kafkaConfigBean.maxWaitTime",
        "value" : 2000
      }, {
        "name" : "kafkaConfigBean.maxRatePerPartition",
        "value" : 1000
      }, {
        "name" : "kafkaConfigBean.keyDeserializer",
        "value" : "CONFLUENT"
      }, {
        "name" : "kafkaConfigBean.valueDeserializer",
        "value" : "CONFLUENT"
      }, {
        "name" : "kafkaConfigBean.kafkaConsumerConfigs",
        "value" : [ ]
      }, {
        "name" : "stageOnRecordError",
        "value" : "TO_ERROR"
      } ],
      "uiInfo" : {
        "yPos" : 40.85545349121094,
        "stageType" : "SOURCE",
        "rawSource" : {
          "configuration" : [ {
            "name" : "brokerHost",
            "value" : "localhost"
          }, {
            "name" : "brokerPort",
            "value" : 9092
          }, {
            "name" : "topic",
            "value" : "myTopic"
          }, {
            "name" : "partition",
            "value" : 0
          }, {
            "name" : "maxWaitTime",
            "value" : 1000
          } ]
        },
        "description" : "",
        "label" : "input",
        "xPos" : 184.59434509277344
      },
      "inputLanes" : [ ],
      "outputLanes" : [ "KafkaConsumer_01OutputLane15081523550290" ],
      "eventLanes" : [ ]
    }, {
      "instanceName" : "LocalFS_01",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_destination_localfilesystem_LocalFileSystemDTarget",
      "stageVersion" : "3",
      "configuration" : [ {
        "name" : "configs.uniquePrefix",
        "value" : "sdc-${sdc:id()}"
      }, {
        "name" : "configs.fileNameSuffix",
        "value" : null
      }, {
        "name" : "configs.dirPathTemplateInHeader",
        "value" : false
      }, {
        "name" : "configs.dirPathTemplate",
        "value" : "/tmp/data_monitor/out/success/${YYYY()}-${MM()}-${DD()}-${hh()}"
      }, {
        "name" : "configs.timeZoneID",
        "value" : "Europe/Moscow"
      }, {
        "name" : "configs.timeDriver",
        "value" : "${time:now()}"
      }, {
        "name" : "configs.maxRecordsPerFile",
        "value" : 0
      }, {
        "name" : "configs.maxFileSize",
        "value" : 0
      }, {
        "name" : "configs.idleTimeout",
        "value" : "${1 * HOURS}"
      }, {
        "name" : "configs.compression",
        "value" : "NONE"
      }, {
        "name" : "configs.otherCompression",
        "value" : null
      }, {
        "name" : "configs.fileType",
        "value" : "TEXT"
      }, {
        "name" : "configs.keyEl",
        "value" : "${uuid()}"
      }, {
        "name" : "configs.lateRecordsLimit",
        "value" : "${1 * HOURS}"
      }, {
        "name" : "configs.rollIfHeader",
        "value" : false
      }, {
        "name" : "configs.rollHeaderName",
        "value" : "roll"
      }, {
        "name" : "configs.lateRecordsAction",
        "value" : "SEND_TO_ERROR"
      }, {
        "name" : "configs.lateRecordsDirPathTemplate",
        "value" : "/tmp/late/${YYYY()}-${MM()}-${DD()}"
      }, {
        "name" : "configs.dataFormat",
        "value" : "JSON"
      }, {
        "name" : "configs.hdfsPermissionCheck",
        "value" : true
      }, {
        "name" : "configs.permissionEL",
        "value" : null
      }, {
        "name" : "configs.skipOldTempFileRecovery",
        "value" : false
      }, {
        "name" : "configs.dataGeneratorFormatConfig.charset",
        "value" : "UTF-8"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.csvFileFormat",
        "value" : "CSV"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.csvHeader",
        "value" : "NO_HEADER"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.csvReplaceNewLines",
        "value" : true
      }, {
        "name" : "configs.dataGeneratorFormatConfig.csvReplaceNewLinesString",
        "value" : " "
      }, {
        "name" : "configs.dataGeneratorFormatConfig.csvCustomDelimiter",
        "value" : "|"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.csvCustomEscape",
        "value" : "\\"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.csvCustomQuote",
        "value" : "\""
      }, {
        "name" : "configs.dataGeneratorFormatConfig.jsonMode",
        "value" : "MULTIPLE_OBJECTS"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.textFieldPath",
        "value" : "/text"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.textRecordSeparator",
        "value" : "\\n"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.textFieldMissingAction",
        "value" : "ERROR"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.textEmptyLineIfNull",
        "value" : false
      }, {
        "name" : "configs.dataGeneratorFormatConfig.avroSchemaSource",
        "value" : null
      }, {
        "name" : "configs.dataGeneratorFormatConfig.avroSchema",
        "value" : null
      }, {
        "name" : "configs.dataGeneratorFormatConfig.registerSchema",
        "value" : false
      }, {
        "name" : "configs.dataGeneratorFormatConfig.schemaRegistryUrlsForRegistration",
        "value" : [ ]
      }, {
        "name" : "configs.dataGeneratorFormatConfig.schemaRegistryUrls",
        "value" : [ ]
      }, {
        "name" : "configs.dataGeneratorFormatConfig.schemaLookupMode",
        "value" : "SUBJECT"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.subject",
        "value" : null
      }, {
        "name" : "configs.dataGeneratorFormatConfig.subjectToRegister",
        "value" : null
      }, {
        "name" : "configs.dataGeneratorFormatConfig.schemaId",
        "value" : null
      }, {
        "name" : "configs.dataGeneratorFormatConfig.avroCompression",
        "value" : "NULL"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.binaryFieldPath",
        "value" : "/"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.protoDescriptorFile",
        "value" : null
      }, {
        "name" : "configs.dataGeneratorFormatConfig.messageType",
        "value" : null
      }, {
        "name" : "configs.dataGeneratorFormatConfig.fileNameEL",
        "value" : null
      }, {
        "name" : "configs.dataGeneratorFormatConfig.wholeFileExistsAction",
        "value" : "TO_ERROR"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.includeChecksumInTheEvents",
        "value" : false
      }, {
        "name" : "configs.dataGeneratorFormatConfig.checksumAlgorithm",
        "value" : "MD5"
      }, {
        "name" : "configs.dataGeneratorFormatConfig.xmlPrettyPrint",
        "value" : true
      }, {
        "name" : "configs.dataGeneratorFormatConfig.xmlValidateSchema",
        "value" : false
      }, {
        "name" : "configs.dataGeneratorFormatConfig.xmlSchema",
        "value" : null
      }, {
        "name" : "stageOnRecordError",
        "value" : "TO_ERROR"
      }, {
        "name" : "stageRequiredFields",
        "value" : [ ]
      }, {
        "name" : "stageRecordPreconditions",
        "value" : [ ]
      } ],
      "uiInfo" : {
        "description" : "",
        "label" : "local_sink",
        "xPos" : 471.2611083984375,
        "yPos" : 44.572269439697266,
        "stageType" : "TARGET"
      },
      "inputLanes" : [ "KafkaConsumer_01OutputLane15081523550290" ],
      "outputLanes" : [ ],
      "eventLanes" : [ ]
    } ],
    "errorStage" : {
      "instanceName" : "WritetoFile_ErrorStage",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_destination_recordstolocalfilesystem_ToErrorLocalFSDTarget",
      "stageVersion" : "1",
      "configuration" : [ {
        "name" : "directory",
        "value" : "/tmp/data_monitor/out/error"
      }, {
        "name" : "uniquePrefix",
        "value" : "sdc-${sdc:id()}"
      }, {
        "name" : "rotationIntervalSecs",
        "value" : "${1 * HOURS}"
      }, {
        "name" : "maxFileSizeMbs",
        "value" : 512
      } ],
      "uiInfo" : {
        "description" : "",
        "label" : "Error Records - Write to File",
        "xPos" : 622,
        "yPos" : 50,
        "stageType" : "TARGET"
      },
      "inputLanes" : [ ],
      "outputLanes" : [ ],
      "eventLanes" : [ ]
    },
    "info" : {
      "pipelineId" : "connectorCHPGPLP2S20002datafile2d0f60f29-8175-4714-a60f-aa566e726ecb",
      "title" : "connector_test_data_file2",
      "description" : "",
      "created" : 1508175563039,
      "lastModified" : 1508177203011,
      "creator" : "admin",
      "lastModifier" : "admin",
      "lastRev" : "0",
      "uuid" : "13fea9f9-5253-432b-8bf4-09dfbe763dcd",
      "valid" : true,
      "metadata" : {
        "labels" : [ ]
      },
      "name" : "connectorCHPGPLP2S20002datafile2d0f60f29-8175-4714-a60f-aa566e726ecb",
      "sdcVersion" : "2.7.1.1",
      "sdcId" : "a2869998-a9c9-11e7-94f2-31416694a1b6"
    },
    "metadata" : {
      "labels" : [ ]
    },
    "statsAggregatorStage" : {
      "instanceName" : "WritetoDPMdirectly_StatsAggregatorStage",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget",
      "stageVersion" : "1",
      "configuration" : [ ],
      "uiInfo" : {
        "description" : "",
        "label" : "Stats Aggregator - Write to DPM directly",
        "xPos" : 280,
        "yPos" : 50,
        "stageType" : "TARGET"
      },
      "inputLanes" : [ ],
      "outputLanes" : [ ],
      "eventLanes" : [ ]
    },
    "startEventStages" : [ {
      "instanceName" : "Discard_StartEventStage",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget",
      "stageVersion" : "1",
      "configuration" : [ ],
      "uiInfo" : {
        "description" : "",
        "label" : "Start Event - Discard",
        "xPos" : 280,
        "yPos" : 50,
        "stageType" : "TARGET"
      },
      "inputLanes" : [ ],
      "outputLanes" : [ ],
      "eventLanes" : [ ]
    } ],
    "stopEventStages" : [ {
      "instanceName" : "Discard_StopEventStage",
      "library" : "streamsets-datacollector-basic-lib",
      "stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget",
      "stageVersion" : "1",
      "configuration" : [ ],
      "uiInfo" : {
        "description" : "",
        "label" : "Stop Event - Discard",
        "xPos" : 280,
        "yPos" : 50,
        "stageType" : "TARGET"
      },
      "inputLanes" : [ ],
      "outputLanes" : [ ],
      "eventLanes" : [ ]
    } ],
    "valid" : true,
    "issues" : {
      "stageIssues" : { },
      "pipelineIssues" : [ ],
      "issueCount" : 0
    },
    "previewable" : true
  },
  "pipelineRules" : {
    "schemaVersion" : 3,
    "version" : 2,
    "metricsRuleDefinitions" : [ {
      "id" : "badRecordsAlertID",
      "alertText" : "High incidence of Error Records",
      "metricId" : "pipeline.batchErrorRecords.counter",
      "metricType" : "COUNTER",
      "metricElement" : "COUNTER_COUNT",
      "condition" : "${value() > 100}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1508152340733,
      "valid" : true
    }, {
      "id" : "stageErrorAlertID",
      "alertText" : "High incidence of Stage Errors",
      "metricId" : "pipeline.batchErrorMessages.counter",
      "metricType" : "COUNTER",
      "metricElement" : "COUNTER_COUNT",
      "condition" : "${value() > 100}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1508152340733,
      "valid" : true
    }, {
      "id" : "idleGaugeID",
      "alertText" : "Pipeline is Idle",
      "metricId" : "RuntimeStatsGauge.gauge",
      "metricType" : "GAUGE",
      "metricElement" : "TIME_OF_LAST_RECEIVED_RECORD",
      "condition" : "${time:now() - value() > 120000}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1508152340733,
      "valid" : true
    }, {
      "id" : "batchTimeAlertID",
      "alertText" : "Batch taking more time to process",
      "metricId" : "RuntimeStatsGauge.gauge",
      "metricType" : "GAUGE",
      "metricElement" : "CURRENT_BATCH_AGE",
      "condition" : "${value() > 200}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1508152340733,
      "valid" : true
    }, {
      "id" : "memoryLimitAlertID",
      "alertText" : "Memory limit for pipeline exceeded",
      "metricId" : "pipeline.memoryConsumed.counter",
      "metricType" : "COUNTER",
      "metricElement" : "COUNTER_COUNT",
      "condition" : "${value() > (jvm:maxMemoryMB() * 0.65)}",
      "sendEmail" : false,
      "enabled" : false,
      "timestamp" : 1508152340733,
      "valid" : true
    } ],
    "dataRuleDefinitions" : [ ],
    "driftRuleDefinitions" : [ ],
    "uuid" : "6ab2f9cf-0043-48e7-8e5f-1a3d2ec86d4d",
    "configuration" : [ {
      "name" : "emailIDs",
      "value" : [ ]
    }, {
      "name" : "webhookConfigs",
      "value" : [ ]
    } ],
    "configIssues" : [ ],
    "ruleIssues" : [ ]
  },
  "libraryDefinitions" : null
}

person Donat Fetisov    schedule 16.10.2017    source источник
comment
Покажи свой код!   -  person Clonkex    schedule 17.10.2017
comment
Не уверен, почему это может быть, но... Существует группа StreamSets Google, канал Slack и специальный сайт вопросов и ответов - см. streamsets .com/community — там вы можете напрямую взаимодействовать с сообществом.   -  person metadaddy    schedule 17.10.2017
comment
Я не использую никакого кода, я просто использую визуальный редактор - я получил источник Kafka Consumer и сразу после этого пункт назначения Local FS.   -  person Donat Fetisov    schedule 17.10.2017
comment
Что получится, если вы прочитаете сообщение с помощью инструментов командной строки Kafka? Пример: консольный потребитель?   -  person Jeff Evans    schedule 18.10.2017
comment
Я тестировал с помощью обычных инструментов Kafka - дата указана в миллисекундах, и все в порядке - если вы конвертируете - это правильное местное время и нет разницы   -  person Donat Fetisov    schedule 19.10.2017
comment
Что именно представляет собой эта дата-время? Это конкретное поле (угадывание формата Avro) в вашем сообщении? Можете ли вы показать скриншот результата предварительного просмотра, показывающий неправильное значение? Если вы записываете данные в место назначения (например, в локальную файловую систему), они также неверны или только в пользовательском интерфейсе?   -  person Jeff Evans    schedule 19.10.2017
comment
Это связано с вопросом, который вы задали в канале Slack?   -  person metadaddy    schedule 20.10.2017


Ответы (2)


Для протокола, мы выяснили это в другом месте. Это было связано с шаблонами даты Java .

Анализ строки с миллисекундами, например 2017-10-20 13:15:33.611796, с выражением типа ${time:extractDateFromString(record:value("/ts"),"yyyy-MM-dd hh:mm:ss.SSSSSS")} работает не так, как вы могли ожидать. S означает миллисекунды, поэтому эти 611 796 микросекунд интерпретируются как 611 796 миллисекунд, а 611 секунд (чуть более 10 минут) добавляются ко времени, в результате чего получается объект Date со значением 2017-10-20 01:25:44.796.

Обрезка микросекунд из входящей строки, например: ${time:extractDateFromString(str:substring(record:value("/ts"), 0, 23),"yyyy-MM-dd hh:mm:ss.SSS")} работает. При необходимости микросекунды могут быть захвачены с помощью ${str:substring(record:value("/ts"), 23, 26)}

person metadaddy    schedule 01.11.2017

Я столкнулся с очень похожей проблемой при запуске служб в Docker для Mac. Существует давняя известная проблема, из-за которой системное время виртуальной машины отличается от времени хост-компьютера, и ее можно решить, перезапустив Docker. Есть вероятность, что это может повлиять и на вас?

person Dima Spivak    schedule 18.10.2017
comment
Я запускаю потоковые наборы на сервере CentOS, Kafka на другом, также на CentOS. Докер не используется. Думаю это не мой случай. - person Donat Fetisov; 19.10.2017