Oxy with gorilla for websocket(+integration tests)

This commit is contained in:
Julien Salleyron 2017-07-25 16:56:46 +02:00 committed by SALLEYRON Julien
parent a09a8b1235
commit 888e6dcbc8
7 changed files with 184 additions and 81 deletions

4
glide.lock generated
View file

@ -1,4 +1,4 @@
hash: df3bba260c0e5c3183741ab4aca2ae551a5c6d9ba11f4e05b90554a9ffad96ad
hash: bfc5801ed56be5f703a0924d8832dcccc42bf02f9e2b035ef77eab62c0cb4884
updated: 2017-06-29T16:47:14.848940186+02:00
imports:
- name: cloud.google.com/go
@ -411,7 +411,7 @@ imports:
- name: github.com/vdemeester/docker-events
version: be74d4929ec1ad118df54349fda4b0cba60f849b
- name: github.com/vulcand/oxy
version: 7da864c1d53bd58165435bb78bbf8c01f01c8f4a
version: 49f1894c20d972f5c73ff44b859f87deb83f0076
repo: https://github.com/containous/oxy.git
vcs: git
subpackages:

View file

@ -8,7 +8,7 @@ import:
- package: github.com/cenk/backoff
- package: github.com/containous/flaeg
- package: github.com/vulcand/oxy
version: 7da864c1d53bd58165435bb78bbf8c01f01c8f4a
version: 49f1894c20d972f5c73ff44b859f87deb83f0076
repo: https://github.com/containous/oxy.git
vcs: git
subpackages:

View file

@ -0,0 +1,24 @@
defaultEntryPoints = ["http"]
logLevel = "DEBUG"
[entryPoints]
[entryPoints.http]
address = ":8000"
[web]
address = ":8080"
[file]
[backends]
[backends.backend1]
[backends.backend1.servers.server1]
url = "{{ .WebsocketServer }}"
[frontends]
[frontends.frontend1]
backend = "backend1"
[frontends.frontend1.routes.test_1]
rule = "Path:/ws"

View file

@ -14,6 +14,8 @@ import (
"github.com/containous/traefik/integration/utils"
"github.com/go-check/check"
"bytes"
compose "github.com/libkermit/compose/check"
checker "github.com/vdemeester/shakers"
)
@ -38,6 +40,7 @@ func init() {
check.Suite(&EurekaSuite{})
check.Suite(&AcmeSuite{})
check.Suite(&DynamoDBSuite{})
check.Suite(&WebsocketSuite{})
}
var traefikBinary = "../dist/traefik"
@ -71,6 +74,18 @@ func (s *BaseSuite) createComposeProject(c *check.C, name string) {
s.composeProject = compose.CreateProject(c, projectName, composeFile)
}
func withConfigFile(file string) string {
return "--configFile=" + file
}
func (s *BaseSuite) cmdTraefik(args ...string) (*exec.Cmd, *bytes.Buffer) {
cmd := exec.Command(traefikBinary, args...)
var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out
return cmd, &out
}
func (s *BaseSuite) traefikCmd(c *check.C, args ...string) (*exec.Cmd, string) {
cmd, out, err := utils.RunCommand(traefikBinary, args...)
c.Assert(err, checker.IsNil, check.Commentf("Fail to run %s with %v", traefikBinary, args))

View file

@ -0,0 +1,81 @@
package main
import (
"net/http"
"net/http/httptest"
"time"
"github.com/go-check/check"
"errors"
"io/ioutil"
"os"
"strings"
"github.com/containous/traefik/integration/utils"
"github.com/gorilla/websocket"
checker "github.com/vdemeester/shakers"
)
// WebsocketSuite
type WebsocketSuite struct{ BaseSuite }
func (suite *WebsocketSuite) TestBase(c *check.C) {
var upgrader = websocket.Upgrader{} // use default options
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer c.Close()
for {
mt, message, err := c.ReadMessage()
if err != nil {
break
}
err = c.WriteMessage(mt, message)
if err != nil {
break
}
}
}))
file := suite.adaptFile(c, "fixtures/websocket/config.toml", struct {
WebsocketServer string
}{
WebsocketServer: srv.URL,
})
defer os.Remove(file)
cmd, _ := suite.cmdTraefik(withConfigFile(file), "--debug")
err := cmd.Start()
c.Assert(err, check.IsNil)
defer cmd.Process.Kill()
// wait for traefik
err = utils.TryRequest("http://127.0.0.1:8080/api/providers", 60*time.Second, func(res *http.Response) error {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
if !strings.Contains(string(body), "127.0.0.1") {
return errors.New("Incorrect traefik config")
}
return nil
})
c.Assert(err, checker.IsNil)
conn, _, err := websocket.DefaultDialer.Dial("ws://127.0.0.1:8000/ws", nil)
c.Assert(err, checker.IsNil)
conn.WriteMessage(websocket.TextMessage, []byte("OK"))
_, msg, err := conn.ReadMessage()
c.Assert(err, checker.IsNil)
c.Assert(string(msg), checker.Equals, "OK")
}

View file

@ -4,18 +4,16 @@
package forward
import (
"bufio"
"crypto/tls"
"io"
"net"
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/vulcand/oxy/utils"
)
@ -255,77 +253,47 @@ func (f *httpForwarder) copyRequest(req *http.Request, u *url.URL) *http.Request
// serveHTTP forwards websocket traffic
func (f *websocketForwarder) serveHTTP(w http.ResponseWriter, req *http.Request, ctx *handlerContext) {
outReq := f.copyRequest(req, req.URL)
host := outReq.URL.Host
dial := net.Dial
// if host does not specify a port, use the default http port
if !strings.Contains(host, ":") {
if outReq.URL.Scheme == "wss" {
host = host + ":443"
} else {
host = host + ":80"
}
dialer := websocket.DefaultDialer
if outReq.URL.Scheme == "wss" && f.TLSClientConfig != nil {
dialer.TLSClientConfig = f.TLSClientConfig
}
if outReq.URL.Scheme == "wss" {
if f.TLSClientConfig == nil {
f.TLSClientConfig = http.DefaultTransport.(*http.Transport).TLSClientConfig
}
dial = func(network, address string) (net.Conn, error) {
return tls.Dial("tcp", host, f.TLSClientConfig)
}
}
targetConn, err := dial("tcp", host)
targetConn, resp, err := dialer.Dial(outReq.URL.String(), outReq.Header)
if err != nil {
ctx.log.Errorf("Error dialing `%v`: %v", host, err)
ctx.log.Errorf("Error dialing `%v`: %v", outReq.Host, err)
ctx.errHandler.ServeHTTP(w, req, err)
return
}
hijacker, ok := w.(http.Hijacker)
if !ok {
ctx.log.Errorf("Unable to hijack the connection: %v", reflect.TypeOf(w))
ctx.errHandler.ServeHTTP(w, req, nil)
return
}
underlyingConn, _, err := hijacker.Hijack()
upgrader := websocket.Upgrader{}
utils.RemoveHeaders(resp.Header, WebsocketUpgradeHeaders...)
underlyingConn, err := upgrader.Upgrade(w, req, resp.Header)
if err != nil {
ctx.log.Errorf("Unable to hijack the connection: %v %v", reflect.TypeOf(w), err)
ctx.errHandler.ServeHTTP(w, req, err)
ctx.log.Errorf("Error while upgrading connection : %v", err)
return
}
// it is now caller's responsibility to Close the underlying connection
defer underlyingConn.Close()
defer targetConn.Close()
ctx.log.Infof("Writing outgoing Websocket request to target connection: %+v", outReq)
// write the modified incoming request to the dialed connection
if err = outReq.Write(targetConn); err != nil {
ctx.log.Errorf("Unable to copy request to target: %v", err)
ctx.errHandler.ServeHTTP(w, req, err)
return
errc := make(chan error, 2)
replicate := func(dst io.Writer, src io.Reader) {
_, err := io.Copy(dst, src)
errc <- err
}
br := bufio.NewReader(targetConn)
resp, err := http.ReadResponse(br, req)
resp.Write(underlyingConn)
defer resp.Body.Close()
go replicate(targetConn.UnderlyingConn(), underlyingConn.UnderlyingConn())
// We connect the conn only if the switching protocol has not failed
if resp.StatusCode == http.StatusSwitchingProtocols {
ctx.log.Infof("Switching protocol success")
errc := make(chan error, 2)
replicate := func(dst io.Writer, src io.Reader) {
_, err := io.Copy(dst, src)
errc <- err
}
go replicate(targetConn, underlyingConn)
go replicate(underlyingConn, targetConn)
<-errc
// Try to read the first message
t, msg, err := targetConn.ReadMessage()
if err != nil {
ctx.log.Errorf("Couldn't read first message : %v", err)
} else {
ctx.log.Infof("Switching protocol failed")
underlyingConn.WriteMessage(t, msg)
}
go replicate(underlyingConn.UnderlyingConn(), targetConn.UnderlyingConn())
<-errc
}
// copyRequest makes a copy of the specified request.
@ -335,6 +303,7 @@ func (f *websocketForwarder) copyRequest(req *http.Request, u *url.URL) (outReq
outReq.URL = utils.CopyURL(req.URL)
outReq.URL.Scheme = u.Scheme
outReq.URL.Path = outReq.RequestURI
//sometimes backends might be registered as HTTP/HTTPS servers so translate URLs to websocket URLs.
switch u.Scheme {
@ -345,19 +314,12 @@ func (f *websocketForwarder) copyRequest(req *http.Request, u *url.URL) (outReq
}
outReq.URL.Host = u.Host
outReq.URL.Opaque = req.RequestURI
// raw query is already included in RequestURI, so ignore it to avoid dupes
outReq.URL.RawQuery = ""
outReq.Proto = "HTTP/1.1"
outReq.ProtoMajor = 1
outReq.ProtoMinor = 1
// Overwrite close flag so we can keep persistent connection for the backend servers
outReq.Close = false
outReq.Header = make(http.Header)
utils.CopyHeaders(outReq.Header, req.Header)
utils.RemoveHeaders(outReq.Header, WebsocketDialHeaders...)
if f.rewriter != nil {
f.rewriter.Rewrite(outReq)

View file

@ -1,20 +1,25 @@
package forward
const (
XForwardedProto = "X-Forwarded-Proto"
XForwardedFor = "X-Forwarded-For"
XForwardedHost = "X-Forwarded-Host"
XForwardedServer = "X-Forwarded-Server"
Connection = "Connection"
KeepAlive = "Keep-Alive"
ProxyAuthenticate = "Proxy-Authenticate"
ProxyAuthorization = "Proxy-Authorization"
Te = "Te" // canonicalized version of "TE"
Trailers = "Trailers"
TransferEncoding = "Transfer-Encoding"
Upgrade = "Upgrade"
ContentLength = "Content-Length"
ContentType = "Content-Type"
XForwardedProto = "X-Forwarded-Proto"
XForwardedFor = "X-Forwarded-For"
XForwardedHost = "X-Forwarded-Host"
XForwardedServer = "X-Forwarded-Server"
Connection = "Connection"
KeepAlive = "Keep-Alive"
ProxyAuthenticate = "Proxy-Authenticate"
ProxyAuthorization = "Proxy-Authorization"
Te = "Te" // canonicalized version of "TE"
Trailers = "Trailers"
TransferEncoding = "Transfer-Encoding"
Upgrade = "Upgrade"
ContentLength = "Content-Length"
ContentType = "Content-Type"
SecWebsocketKey = "Sec-Websocket-Key"
SecWebsocketVersion = "Sec-Websocket-Version"
SecWebsocketExtensions = "Sec-Websocket-Extensions"
SecWebsocketProtocol = "Sec-Websocket-Protocol"
SecWebsocketAccept = "Sec-Websocket-Accept"
)
// Hop-by-hop headers. These are removed when sent to the backend.
@ -30,3 +35,19 @@ var HopHeaders = []string{
TransferEncoding,
Upgrade,
}
var WebsocketDialHeaders = []string{
Upgrade,
Connection,
SecWebsocketKey,
SecWebsocketVersion,
SecWebsocketExtensions,
SecWebsocketProtocol,
SecWebsocketAccept,
}
var WebsocketUpgradeHeaders = []string{
Upgrade,
Connection,
SecWebsocketAccept,
}