Как правильно работать с NSStreams (без блокировки, чтение до конца данных и повторная попытка сообщения)?

Я пытаюсь работать с NSInputStream и NSOutputStream, но это причиняет много боли.

У меня есть два устройства, которые обмениваются данными Json. Некоторые данные могут быть очень длинными, поэтому NSOutputStreamsends разбивает их на несколько пакетов.

Мне нужно, чтобы получение не блокировалось в основном потоке и могло читать все необходимые пакеты json, прежде чем пытаться его проанализировать. Затем продолжите чтение остальных пакетов данных json.

Мне нужно, чтобы отправка не блокировалась в основном потоке и могла завершить отправку данных, если первая партия не была отправлена. Затем продолжите отправку остальных данных json.

Я использую swift, но также могу использовать и цель c.

Вот код до сих пор. Мой основной вспомогательный класс потока:

public class StreamHelper : NSObject, NSStreamDelegate {
    static let DATA_BYTE_LENGTH = 4;

    public static func writeToOutputStream(text: String!, outputStream:NSOutputStream!) -> Int!{
        let encodedDataArray = [UInt8](text.utf8)

        var count: Int = encodedDataArray.count.littleEndian
        //convert int to pointer, which is required for the write method.
        withUnsafePointer(&count) { (pointer: UnsafePointer<Int>) -> Void in
            outputStream.write(UnsafePointer<UInt8>(pointer), maxLength: DATA_BYTE_LENGTH)
        }
        let bytesWritten = outputStream.write(encodedDataArray, maxLength: encodedDataArray.count)
        return bytesWritten;
    }

    public static func readFromInputStream(inputStream: NSInputStream!) -> String!{
        var buffer = [UInt8](count: 4096, repeatedValue: 0)
        var text = ""

        while (inputStream.hasBytesAvailable){
            let len = inputStream!.read(&buffer, maxLength: buffer.count)
            if(len > 0){
                if let output = NSString(bytes: &buffer, length: buffer.count, encoding: NSUTF8StringEncoding) as? String{
                    if (!output.isEmpty){
                        text += output
                    }
                }
            }
        }
        return text
    }
}

Основной код:

public func stream(aStream: NSStream, handleEvent eventCode: NSStreamEvent) {
    print("Reading from stream... ")
    switch (eventCode){
        case NSStreamEvent.ErrorOccurred:
            print("ErrorOccurred")
            break
        case NSStreamEvent.None:
            print("None")
            break
        case NSStreamEvent.EndEncountered:
            print("EndEncountered")
            if((aStream == inputStream) && inputStream!.hasBytesAvailable){
                // If all data hasn't been read, fall through to the "has bytes" event
            } else{
                break
            }
        case NSStreamEvent.HasBytesAvailable:
            print("HasBytesAvaible")
            let methodJson = StreamHelper.readFromInputStream(inputStream!)
            if(!methodJson.isEmpty){
                let cMethodJson = methodJson.cStringUsingEncoding(NSUTF8StringEncoding)!
                let returnedJsonString = String.fromCString(callMethod(cMethodJson))
                StreamHelper.writeToOutputStream(returnedJsonString, outputStream: outputStream!)
            }
            break
        case NSStreamEvent.OpenCompleted:
            print("OpenCompleted")
            break
        case NSStreamEvent.HasSpaceAvailable:
            print("HasSpaceAvailable")

            if(aStream == outputStream){
            }
            break
        default:
            break
    }
}

Некоторый код установки:

func connectToService(service: NSNetService!){

service.getInputStream(&inputStream, outputStream: &outputStream)

inputStream!.delegate = self
outputStream!.delegate = self

inputStream!.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
outputStream!.scheduleInRunLoop(NSRunLoop.currentRunLoop(), forMode: NSDefaultRunLoopMode)
inputStream!.open()
outputStream!.open()
}

Как правильно работать с NSStreams или есть лучшее решение, чем использование NSStreams?


person Daniel Ryan    schedule 18.11.2015    source источник
comment
почему NSStreams? Будут ли сокеты или запуск небольшого HTTP-сервера с bonjour вариантом?   -  person vikingosegundo    schedule 05.01.2016
comment
Я использую Bonjour и сокеты для подключения. Есть ли что-то еще, кроме NSStreams, которые я должен использовать для установки/получения данных?   -  person Daniel Ryan    schedule 05.01.2016
comment
зачем вообще? json не кажется мне хорошим форматом для потоковой передачи.   -  person vikingosegundo    schedule 05.01.2016
comment
Все общение осуществляется с помощью Json. Этот бит нельзя изменить.   -  person Daniel Ryan    schedule 05.01.2016
comment
Я не сомневаюсь в json. Я сомневаюсь, чтобы использовать nsstream.   -  person vikingosegundo    schedule 05.01.2016
comment
Я тоже сомневаюсь в NSStream. Я добавил некоторый код установки выше. Если вы можете придумать другой способ сделать это, я весь внимание.   -  person Daniel Ryan    schedule 06.01.2016
comment
Как я уже сказал: сокеты или http-сервер, работающие в вашем приложении.   -  person vikingosegundo    schedule 06.01.2016
comment
Я уже использую Bonjour, который использует сокеты. Да, http-сервер можно сделать, но для мобильного приложения это кажется странным. Вероятно, это будет большая работа, так что, может быть, я мог бы заняться этим, если ничего не получится. Спасибо.   -  person Daniel Ryan    schedule 06.01.2016
comment
Почему это должна быть большая работа. Вам не нужно устанавливать Apache. Я использовал этот подход в нескольких приложениях — безупречно.   -  person vikingosegundo    schedule 06.01.2016
comment
О, приятно слышать. Просто я должен координировать свои действия с третьей стороной, которая создает аппаратное устройство, которое мы связываем с нашим приложением.   -  person Daniel Ryan    schedule 06.01.2016
comment
Я бы использовал GCDAsyncSocket или аналогичный или http-сервер как быстрее   -  person vikingosegundo    schedule 06.01.2016
comment
@vikingosegundo GCDAsyncSocket, кажется, имеет именно то, что мне нужно. Поставь это как ответ, и я дам тебе награду. :)   -  person Daniel Ryan    schedule 07.01.2016


Ответы (3)


Вы, наверное, слишком много работаете здесь. NSStreamDelegate был разработан до GCD, когда большая часть работы Cocoa выполнялась в одном потоке. Хотя в некоторых случаях все еще есть причины использовать его, в большинстве случаев GCD и синхронные методы сделают это намного проще. Например, чтобы прочитать, вы должны сделать что-то вроде этого:

import Foundation

enum StreamError: ErrorType {
    case Error(error: NSError?, partialData: [UInt8])
}

func readStream(inputStream: NSInputStream) throws -> [UInt8] {
    let bufferSize = 1024
    var buffer = [UInt8](count: bufferSize, repeatedValue: 0)
    var data: [UInt8] = []

    while true {
        let count = inputStream.read(&buffer, maxLength: buffer.capacity)

        guard count >= 0 else {
            inputStream.close()
            throw StreamError.Error(error: inputStream.streamError, partialData: data)
        }

        guard count != 0 else {
            inputStream.close()
            return data
        }

        data.appendContentsOf(buffer.prefix(count))
    }
}

let textPath = NSBundle.mainBundle().pathForResource("text.txt", ofType: nil)!
let inputStream = NSInputStream(fileAtPath: textPath)!
inputStream.open()
do {
    let data = try readStream(inputStream)
    print(data)
} catch let err {
    print("ERROR: \(err)")
}

Это наверняка заблокирует текущую очередь. Так что не запускайте его в основной очереди. Поместите блок do в блок dispatch_async. Если позже вам понадобятся данные из основной очереди, dispatch_async верните их обратно, как и любой другой фоновый процесс.

person Rob Napier    schedule 05.01.2016
comment
Мне все равно нужно, чтобы NSStreamDelegate проверял наличие данных в сокете, верно? Да, использование dispatch_async поможет с блокировкой, спасибо. Но все еще застрял в обработке json, если пакеты разделены. Обработка очереди json для отправки и проверка отправки всех данных (проверка bytesWritten). - person Daniel Ryan; 06.01.2016
comment
Вызов read блокируется до тех пор, пока не прибудет первый байт, поэтому вам не нужен делегат, чтобы дождаться этого. Вызов write возвращает количество записанных байтов, что является информацией, необходимой для создания повторной попытки. Если возможно, я бы попытался думать только о данных на этом уровне, а не рассматривать их как JSON. Убедитесь, что вы можете надежно отправлять и получать произвольные байты. Затем на следующем уровне позаботьтесь о преобразовании его в сообщения JSON, а выше — в структуры данных. - person Rob Napier; 06.01.2016

Вместо работы с NSStreams я бы работал с прямой отправкой через сокеты.
Существуют оболочки, облегчающие эту задачу, в первую очередь GCDAsyncSocket.

person vikingosegundo    schedule 07.01.2016

Основываясь на других ответах, я придумал это, что работает в Swift 4.2.

public enum StreamError: Error {
    case Error(error: Error?, partialData: [UInt8])
}

extension InputStream {

    public func readData(bufferSize: Int = 1024) throws -> Data {
        var buffer = [UInt8](repeating: 0, count: bufferSize)
        var data: [UInt8] = []

        open()

        while true {
            let count = read(&buffer, maxLength: buffer.capacity)

            guard count >= 0 else {
                close()
                throw StreamError.Error(error: streamError, partialData: data)
            }

            guard count != 0 else {
                close()
                return Data(bytes: data)
            }

            data.append(contentsOf: (buffer.prefix(count)))
        }

    }
}
person Mark Bridges    schedule 14.03.2019