<acronym id="s8ci2"><small id="s8ci2"></small></acronym>
<rt id="s8ci2"></rt><rt id="s8ci2"><optgroup id="s8ci2"></optgroup></rt>
<acronym id="s8ci2"></acronym>
<acronym id="s8ci2"><center id="s8ci2"></center></acronym>
0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

什么是pipeline?Go中構建流數據pipeline的技術

馬哥Linux運維 ? 來源:華為云社區 ? 2024-03-11 10:16 ? 次閱讀

介紹

Go 的并發原語可以輕松構建流數據管道,從而高效利用 I/O 和多個 CPU。 本文展示了此類pipelines的示例,強調了操作失敗時出現的細微之處,并介紹了干凈地處理失敗的技術。

什么是pipeline?

pipeline在Go中并沒有書面的定義,只是眾多并發程序中的一種。非正式地,pipeline由一系列stage組成。每個stage是運行著同一個function的協程組。在每個stage,協程們

通過inbound channel從上游獲取數據

在data上進行運算,通常會產生新的值

通過outbound channel向下游發送數據

每個Stage都有數個inbound channel和outbound channel,除了第一個和最后一個Stage,分別只有outbound和inbound channel。第一個Stage通常叫做Source或Producer。最后一個Stage通常叫做Sink或Consumer。

我們將從一個簡單的示例pipeline開始來解釋這些想法和技術。 稍后,我們將提供一個更實際的例子。

Squaring numbers 平方數

考慮一個有著三個階段的流水線。

第一階段,gen,是個將整數列表轉換為一個發射列表中整數的channel的函數。gen函數啟動一個go routine,用來發送channel中的整數,然后當所有的整數都被發出后,將channel關閉:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二階段,sq從上面的channel中接收數據,返回一個發射對應整數平方數的channel。當inbound channel關閉后,并且這一階段將所有的value發送到下游后,再將這個outbound channel關閉

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main函數組織整個pipeline,并且運行最終的stage:從第二個stage中接收數據然后逐個打印,直到channel被關閉


func main() {
    // Set up the pipeline
    c := gen(2, 3)
    out := sq(c)


    // Consume the output
    // 4
    fmt.Println(<-out)
    // 9
    fmt.Println(<-out)
}

既然sq的inbound channel和outbound channel類型相同,我們可以將其進行任意數量的組合。我們還可以將main函數重寫為循環,就像在其他Stage中做的那樣一樣。


func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

扇入和扇出

許多函數可以從一個channel中獲取數據直到channel被關閉,這被叫做扇出。這提供了一種在worker之間分配工作以并行化 CPU 使用和 I/O 的方法。

一個函數可以通過將多個input channel多路復用到同一個channel,當所有的channel關閉時,該多路復用channel才關閉。從而達到從多個input獲取數據并處理,直到所有input channel都關閉才停止的效果。這叫做扇入。

我們可以將我們的流水線改為運行兩個sq,每個都從相同的channel讀取數據。我們引入一個新的函數merge,來做扇入的工作

func main() {

    in := gen(2, 3)


    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)


    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

merge函數通過對每個channel開啟一個協程,把數據拷貝到另一個out channel中,實現將channel列表轉換為一個channel的效果。當所有send操作完成后,再將out channel關閉。

向一個已經關閉上的channel發送數據會導致panic,所以保證發送完所有再關閉channel至關重要。sync.WaitGroup提供了一個簡單地方式來編排這個同步

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)


    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed, then calls wg.Done
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }
    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

短暫的停頓

我們的pipeline函數有這樣的模式:

當發送任務結束后,關閉發送output channel

直到input channel關閉前,一直從input channel中接收消息

這個模式下,每個階段都可以用協程+for循環的模式來書寫,保證每個數據發送到下游后再關閉所有協程。

但是在實際的pipeline中,階段并不總是接收所有來自inbound channel的數據。通常,如果inbound的值出現了錯誤,pipeline會提前退出。 在任何一種情況下,接收者都不必等待剩余值到達,并且我們希望fast fail(較早階段的Stage盡早停止后期Stage不需要的值)。

在我們的示例pipeline中,如果一個Stage未能消費所有inbound值,則嘗試計算后并發送這些值的 goroutine 將無限期阻塞:

    // Consume the first value from the output.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
}

這就導致了資源泄漏:協程消耗內存、運行資源,并且在協程棧內的golang堆引用導致垃圾無法回收。協程只能自己退出,不能由垃圾回收機制回收。

即使下游的Stage無法接收所有inbound value,我們也需要把上游的協程退出。如果把上游的協程改為有buffer的,可以解決上面的問題。如果Buffer中還有空間,則發送操作可以立刻完成

c := make(chan int, 2) // buffer size 2
c <- 1  // succeeds immediately
c <- 2  // succeeds immediately
c <- 3  // blocks until another goroutine does <-c and receives 1

當要發送的數目可以在channel創建時知道時,buffer可以簡化代碼。舉個例子,讓我們來使用buffer channel,不開辟新的協程來重寫gen方法:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

在我們的pipeline中,我們就需要在merge方法中使用的channel添加buffer:

func merge(cs ...<-chan int) <-chan int {

    var wg sync.WaitGroup
    out := make(chan int, 1) // enough space for the unread inputs
    // ... 其余的沒有變更 ...

盡管上面這個方案修復了阻塞的問題,但它是很差的方案。這里有一個對1的硬編碼,這太脆弱了?你真的能預料到有多少個值不能被正常發送嗎?一旦兩個值不能正常發送,你的協程又阻塞了。

作為替代,我們需要給下游階段提供一個機制,知會下游階段,發送者已經停止發送了。

Explicity cancellation 顯示取消

當main函數決定不從out處接收所有數據,而是退出時,它必須知會上游階段的協程放棄接下來的發送。它通過向一個名叫done的channel發送數據來完成這個動作。因為發送方有兩個,所以 向done發送兩次數據。


func main() {
    in := gen(2, 3)


    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)


    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9


    // Tell the remaining senders we're leaving.
    done <- struct{}{}
    done <- struct{}{}
}

發送到out channel的發送者把原來的邏輯替換成一個select操作,select或者發送一個數據,抑或從done處接收到數據。因為done中數據值的類型根本不重要,主要是接收到值這個事件本身很重要,所以donechannel的類型時struct {}。output循環繼續在inboundchannel上執行,所以上游的階段并沒有被阻塞。(我們稍后會討論如何讓循環迅速返回。)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)


    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

這個方法有一個問題:每一個下游接收者都需要知道可能阻塞的上游發送者總數。維護它們的數目,是一個瑣碎又容易出錯的事情。

我們需要一個機制來讓不可知的、無界的發送協程來停止發送到下游的值。在Go,我們可以通過關閉channel來完成這件事,因為在已經關閉的channel上執行receive操作,會立刻返回該元素的零值。

這說明main函數可以簡單地通過關閉donechannel來讓所有的發送者不阻塞。關閉操作是一個高效的廣播。我們把pipeline中的每個函數都接受done作為參數,并把done在defer語句中關閉, 這樣,如果在main函數中返回,都會通知pipeline中的階段退出。


func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)


    in := gen(done, 2, 3)


    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)


    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9


    // done will be closed by the deferred call.
}

現在當donechannel關閉后,接收到close信息的階段,都可以直接退出了。merge函數中的outout協程可以不從inboundchannel中取數據直接退出,因為它知道,上游的發送sq,接收到close信息,也會直接退出。output通過defer語句來保證wg.Done()一定被調用。(譯者注:來關閉out channel)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)


    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...

相似的,當接收到close信號時,sq函數也可以立刻返回。sq通過defer語句來保證outchannel一定被關閉。

這是給構建pipeline的一些指導:

當所有的發送操作完成后,關閉outbound channel

如果發送發不阻塞,或是channel沒有關閉,接收者會一直從channel中接收數據

Pipeline通過如下兩個方式來解除發送者的阻塞

確保channel的buffer足夠大

顯示知會發送者,接收者已經放棄接收

Digesting a tree 對樹進行摘要

讓我們來考慮一個更實際的pipeline

MD5 是一種消息摘要算法,可用作文件校驗和。 命令行實用程序 md5sum 打印文件列表的摘要值。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我們的示例程序類似于 md5sum,但將單個目錄作為參數并打印該目錄下每個常規文件的摘要值,按路徑名排序。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我們的主函數調MD5All這個輔助函數,返回路徑名和摘要值的map,main函數再將它們排序打印


func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s
", m[path], path)
    }
}

MD5All函數是我們討論的重點。在如下串行化的實現中,沒有使用并發技術,只是簡單對文件進行了遍歷


// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

并行計算摘要

在并行的解法中,我們將MD5All分割為兩個階段的pipeline。第一個階段,sumFiles,遍歷文件樹,針對每個文件,在新的協程中計算摘要,然后把結果發送到channel中,這是result的類型

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles返回兩個channel:一個是result channel,另一個是filepath.Walk中產生的錯誤。walk函數針對每個文件啟動一個新的協程來處理,然后檢查donechannel。如果done已經被關閉,walk函數會立刻停止:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and
    // sends the result on c.
    // Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        // If any error occurred, walk method will return
        err := filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{
                    path: path,
                    sum:  md5.Sum(data),
                    err:  err,
                }:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.
        // Start a goroutine to close c once all the sends are done.
        // No select needed here, since errc is buffered.
        errc <- err
    }()
    return c, errc
}

MD5All從c中接收到摘要數據。當發生錯誤時,MD5All會迅速返回,通過defer語句來關閉donechannel

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)


    c, errc := sumFiles(done, root)


    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

有界的并行

parallel.go 中的 MD5All 實現為每個文件啟動一個新的 goroutine。 在包含許多大文件的目錄中,這可能會分配比機器上可用的內存更多的內存。

我們可以通過限制并行讀取的文件數量來限制這些分配。 在新的解決方式中,我們通過創建固定數量的 goroutine 來讀取文件來做到這一點。 我們的pipeline現在分為三個階段:遍歷樹、讀取并計算文件摘要以及收集摘要。

第一階段 walkFiles 發射出文件樹中常規文件的路徑:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

第二階段啟動固定數量的協程來計算文件摘要,然后發送到c channel中

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

和之前的示例不同,因為多個協程都在共享channel上發送數據,digester函數并沒有關閉output channel。作為替代,當所有的digesters跑完之后,MD5All會關閉channel

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

這里也可以針對每個digester開啟獨立的channel,不過到時候就要對channel進行扇入處理。

最終階段從c中取得所有結果,并且檢查errc中的錯誤。此檢查不能更早發生,因為在此之前,walkFiles 可能會阻塞:

(譯者注:要保證檢查errc的錯誤,發生在filePath.Walk啟動后,done不會再次發送了,協程就不會退出)

   m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

總結

本文介紹了在 Go 中構建流數據pipeline的技術。 處理此類pipeline中的故障很棘手,因為pipeline中的每個階段可能會阻止嘗試向下游發送值,并且下游階段可能不再關心傳入的數據。 我們展示了關閉通道如何向管道啟動的所有 goroutine 廣播“done”信號,并定義了正確構建管道的指南。

審核編輯:黃飛

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • Pipeline
    +關注

    關注

    0

    文章

    27

    瀏覽量

    9301

原文標題:實例詳解在Go中構建流數據pipeline的騷操作

文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    一覽pipeline中所出現的五個基本要素

    stageable、StageableKey是最整個pipeline中的基本數據類型元素
    的頭像 發表于 08-12 11:24 ?678次閱讀
    一覽<b class='flag-5'>pipeline</b>中所出現的五個基本要素

    rs2::pipeline start()的默認配置是什么?

    想知道這一點的原因,就是看我是否可以優化從相機接收到的深度質量。我目前正在使用D415和D435創建點云對象并使用Point-Cloud Library處理它們。這使得深度數據盡可能精確對我來說非常重要
    發表于 11-07 11:17

    求助,能否在一個pipeline添加多個音頻輸入流?

    能否在一個pipeline添加多個音頻輸入流[,例如httpstream flash_tone_stream,因為音頻的輸入方向有兩個?;蛘吣芊襁M行pipeline的時間復用,覺得設置兩個播放的
    發表于 03-10 08:09

    希捷為高畫質錄像機打造Pipeline HD系列專用硬盤機

    希捷為高畫質錄像機打造Pipeline HD系列專用硬盤機 希捷科技(Seagate Technology)宣布在2009國際消費性電子展(CES)發表二款數字錄像機(DVR)專用的Seagate Pipeline HD系列新產品:P
    發表于 01-19 10:13 ?1161次閱讀

    Pipeline ADCs Come of Age

    Pipeline ADCs Come of Age Abstract: In the mid 1970s, a new data converter architecture
    發表于 04-16 16:21 ?996次閱讀
    <b class='flag-5'>Pipeline</b> ADCs Come of Age

    Pipeline ADCs Come of Age

    and mixed-signal community, called pipeline ADCs. The following article takes the knowledge of advantages and disadvantages of the pipeline
    發表于 04-25 10:22 ?1014次閱讀
    <b class='flag-5'>Pipeline</b> ADCs Come of Age

    14位Pipeline ADC設計的帶隙電壓基準源技術

    14位Pipeline ADC設計的帶隙電壓基準源技術 目前,基準電壓源被廣泛應用與高精度比較器,
    發表于 04-23 09:42 ?3544次閱讀
    14位<b class='flag-5'>Pipeline</b> ADC設計的帶隙電壓基準源<b class='flag-5'>技術</b>

    關于pipeline 以及 unroll 指令的介紹

    HLS 優化設計的最關鍵指令有兩個:一個是流水線 (pipeline) 指令,一個是數據流(dataflow) 指令。正確地使用好這兩個指令能夠增強算法地并行性,提升吞吐量,降低延遲但是需要遵循一定的代碼風格。
    的頭像 發表于 02-09 09:53 ?2082次閱讀
    關于<b class='flag-5'>pipeline</b> 以及 unroll 指令的介紹

    修改V4L2的Video Pipeline的devicetree

    PetaLinux 能夠根據Vivado的設計,自動生成V4L2的Video Pipeline的devicetree。但是它主要為Xilinx的VCU TRD服務,測試的組合比較少。很多時候,需要根據自己的工程,修改V4L2的Video Pipeline的devicetr
    的頭像 發表于 08-02 08:03 ?1785次閱讀
    修改V4L2的Video <b class='flag-5'>Pipeline</b>的devicetree

    一圖讀懂CodeArts Pipeline全新升級,5大特性使能企業研發治理

    2023 年2月27日,華為云正式發布流水線服務 CodeArts Pipeline ,旨在提升編排體驗,開放插件平臺,并提供標準化的DevOps企業治理模型,將華為公司內的優秀研發實踐賦能給伙伴
    的頭像 發表于 03-25 07:50 ?792次閱讀
    一圖讀懂CodeArts <b class='flag-5'>Pipeline</b>全新升級,5大特性使能企業研發治理

    SpinalHDL里pipeline的設計思路

    如果你曾看過VexRSICV的設計,對于從事邏輯設計的你會驚訝從未想過邏輯設計還能這么來做。針對VexRSICV所衍生出的pipeline Lib,該系列會對pipeline進行一次梳理。誠如之前一篇博客曾講,這是“勇者的游戲”。
    的頭像 發表于 08-16 15:11 ?744次閱讀
    SpinalHDL里<b class='flag-5'>pipeline</b>的設計思路

    開箱即用!教你如何正確使用華為云CodeArts Pipeline!

    軟件持續交付流水線是一個可視化的自動化任務編排調度平臺,串聯編譯構建、代碼檢查、自動化測試、部署發布等任務,承載軟件從代碼提交到發布上線全自動化流程。一次配置后即可重復觸發執行,避免頻繁低效
    的頭像 發表于 08-30 11:20 ?1196次閱讀
    開箱即用!教你如何正確使用華為云CodeArts <b class='flag-5'>Pipeline</b>!

    Pipeline中throwIt的用法

    字如其名,來看下Pipeline中throwIt的用法,是怎么個丟棄方式。
    的頭像 發表于 10-21 16:24 ?312次閱讀
    <b class='flag-5'>Pipeline</b>中throwIt的用法

    如何在zcu102板卡上創建pipeline呢?

    DisplayPort 1.4 Tx Subsystem core的最簡pipeline就是如它的linux driver wiki page里的figure-4那樣,framebuffer_read+DP+video_phy。
    的頭像 發表于 12-29 10:09 ?258次閱讀

    淺析SpinalHDL中Pipeline中的復位定制

    之前有系列文章介紹了SpinalHDL中Pipeline的使用,最近在一個功能模塊中真實的使用了這個lib。
    的頭像 發表于 03-17 17:31 ?611次閱讀
    淺析SpinalHDL中<b class='flag-5'>Pipeline</b>中的復位定制
    亚洲欧美日韩精品久久_久久精品AⅤ无码中文_日本中文字幕有码在线播放_亚洲视频高清不卡在线观看
    <acronym id="s8ci2"><small id="s8ci2"></small></acronym>
    <rt id="s8ci2"></rt><rt id="s8ci2"><optgroup id="s8ci2"></optgroup></rt>
    <acronym id="s8ci2"></acronym>
    <acronym id="s8ci2"><center id="s8ci2"></center></acronym>