golang

golang tcp socket server

kimbs0301 2024. 7. 8. 22:22

golang 서버 구성시 저 사양 서버 머신에서 10k ~ 15k 동시 접속자 처리도 가능하다.
수신 패킷 단편화 처리, 접속 클라이언트 관리, 파일 로그 등등 추가 기능을 포함하면 실무에서도 사용 가능하다.
개발툴은 vscode를 사용했다.

 

프로젝트 디렉터리 구조

더보기

tcp_socket_server.code-workspace
server
 - main.go
 - client.go
 - channelGroup.go
 - go.mod
client
 - main.go
 - client.go
 - go.mod

 

tcp_socket_server.code-workspace

{
	"folders": [
		{
			"path": "./server"
		},
		{
			"path": "./client"
		}
	],
	"settings": {
		"workbench.list.openMode": "doubleClick",
		"editor.minimap.enabled": false,
		"[go]": {
			"editor.formatOnSave": false
		},
		"go.useLanguageServer": true,
		"go.buildOnSave": "off",
		"go.lintOnSave": "off",
		"go.vetOnSave": "off",
		"go.buildFlags": [],
		"go.vetFlags": [],
		"go.useCodeSnippetsOnFunctionSuggest": false,
		"go.formatTool": "goreturns",
		"go.lintTool": "gometalinter",
		"go.lintFlags": ["--disable=all"],
		"go.formatFlags": [
			"--disable=all"
		]
	}
}

 

server/main.go

// Package main
package main

import (
	"bufio"
	"fmt"
	"net"
	"runtime"
	"sync/atomic"
)

// init
func init() {
	fmt.Println("init")
	runtime.GOMAXPROCS(runtime.NumCPU()) // CPU 코어 개수 만큼 프로세스 설정
}

// main
func main() {
	// TCP Server Listening
	tcpAddr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:8080")
	if err != nil {
		fmt.Printf("%s\n", err.Error())
		return
	}
	listener, err := net.ListenTCP("tcp", tcpAddr)
	if err != nil {
		fmt.Printf("%s\n", err.Error())
		return
	}
	defer listener.Close()

	// NewChannelGroup
	channelGroup := NewChannelGroup()
	go channelGroup.RunBroadcastGoChannel()

	fmt.Println("Tcp Socket Server Startup")

	var number int32 = 0 // socket id
	for {
		conn, err := listener.Accept()
		if err != nil {
			fmt.Printf("Accept error: %s\n", err.Error())
			continue // next
		}

		nextNumber := atomic.AddInt32(&number, 1)
		socketId := int(nextNumber)

		fmt.Printf("socketId: %d, remoteAddr: %s\n", socketId, conn.RemoteAddr().String())

		client := &Client{
			socketId:     socketId,
			conn:         conn,
			readerBuf:    bufio.NewReaderSize(conn, 2048),
			writerBuf:    bufio.NewWriterSize(conn, 8192),
			sendStream:   make(chan []byte, 21),
			channelGroup: channelGroup,
		}

		go client.Write()
		go client.Read()

		channelGroup.clientBroadcastMapMutex.Lock()
		channelGroup.clientBroadcastMap[socketId] = client
		channelGroup.clientBroadcastMapMutex.Unlock()
	}
}

 

server/client.go

// Package main
package main

import (
	"bufio"
	//"bytes"
	"fmt"
	"io"
	"net"
	"strings"
	"sync"
)

// Read
func (c *Client) Read() {
	for {
		readByteSlice := make([]byte, 2048, 2048)
		n, err := c.readerBuf.Read(readByteSlice) // 패킷 읽기
		if err != nil {
			if io.EOF == err {
				fmt.Println("Read close ", c.socketId)
			} else {
				if strings.HasSuffix(err.Error(), "i/o timeout") {
					fmt.Println("Read close i/o timeout ", c.socketId)
				} else {
					fmt.Println("Read error ", c.socketId, err.Error())
				}
			}
			break // 고루틴 종료
		}

		packet := readByteSlice[:n]
		c.channelGroup.broadcastStream <- packet
	}

	c.Close0()
}

// Write
func (c *Client) Write() {
	for {
		packet, ok := <-c.sendStream // server -> client
		if !ok {                     // socket close
			fmt.Println("Write channel disabled", c.socketId)
			break // 고루틴 종료
		}

		// fmt.Printf("%d\n", len(packet))

		_, err := c.writerBuf.Write(packet)
		if err != nil {
			fmt.Printf("Write buffer write error: %s\n", err.Error())
			break // 고루틴 종료
		}
		if err = c.writerBuf.Flush(); err != nil {
			fmt.Printf("Write buffer flush error: %s\n", err.Error())
			break // 고루틴 종료
		}
	}

	c.Close0()
}

// Close0
func (c *Client) Close0() {
	c.onceClose.Do(func() {
		c.channelGroup.clientBroadcastMapMutex.Lock()
		delete(c.channelGroup.clientBroadcastMap, c.socketId) // map element delete
		c.channelGroup.clientBroadcastMapMutex.Unlock()
		c.conn.Close() // tcp socket close
		close(c.sendStream) // go channel close
		fmt.Println("Close", c.socketId)
	})
}

// Client
type Client struct {
	socketId     int
	conn         net.Conn
	readerBuf    *bufio.Reader
	writerBuf    *bufio.Writer
	sendStream   chan []byte
	channelGroup *ChannelGroup
	onceClose    sync.Once
}

 

server/channelGroup.go

// Package main
package main

import (
	"fmt"
	"sync"
	"time"
)

// NewChannelGroup
func NewChannelGroup() *ChannelGroup {
	return &ChannelGroup{
		broadcastStream:    make(chan []byte, 21),        // Buffered Go Channel
		clientBroadcastMap: make(map[int]*Client, 15000), // socketId key
	}
}

// RunBroadcastGoChannel for loop
func (c *ChannelGroup) RunBroadcastGoChannel() {
	for {
		packet, ok := <-c.broadcastStream
		if !ok {
			break
		}

		// fmt.Println("Broadcast", len(packet))
		startTime := time.Now().UnixNano() / 1000000
		c.clientBroadcastMapMutex.Lock()
		for _, client := range c.clientBroadcastMap {
			client.sendStream <- packet
		}
		c.clientBroadcastMapMutex.Unlock()
		elapsedTime := (time.Now().UnixNano() / 1000000) - startTime
		if elapsedTime > 10 {
			fmt.Println("elapsedTime:", elapsedTime)
		}
	}

	fmt.Println("Broadcast go channel exit")
}

// ChannelGroup
type ChannelGroup struct {
	broadcastStream         chan []byte     // broadcast Go Channel
	clientBroadcastMap      map[int]*Client // broadcast 클라이언트 소켓 Map
	clientBroadcastMapMutex sync.Mutex
}

 

client/main.go

// Package main
package main

import (
	"bufio"
	"fmt"
	"net"
	"runtime"
	"sync"
	"sync/atomic"
	"time"
)

// init
func init() {
	fmt.Println("init")
	runtime.GOMAXPROCS(runtime.NumCPU()) // CPU 코어 개수 만큼 프로세스 설정
}

// main
func main() {
	var wg sync.WaitGroup
	connectAddr := "127.0.0.1:8080"

	var number int32 = 0
	for i := 0; i < 100; i++ {
		counter := atomic.AddInt32(&number, 1)
		socketId := int(counter)
		wg.Add(1)

		if conn, err := net.DialTimeout("tcp", connectAddr, time.Second*10); err == nil {
			fmt.Println(conn.RemoteAddr().String())

			client := &Client{
				socketId:   socketId,
				conn:       conn,
				readerBuf:  bufio.NewReaderSize(conn, 8192),
				writerBuf:  bufio.NewWriterSize(conn, 2048),
				sendStream: make(chan []byte, 21),
			}

			go client.Write()
			go client.Read()

			time.Sleep(time.Second * 1)

			go client.LoopWrite(&wg)
		} else {
			fmt.Println("conn error")
			wg.Done()
		}
	}

	wg.Wait()
	fmt.Println("complete")
}

// Message
type Message struct {
	Timestamp int64  `json:"timestamp"`
	Text      string `json:"text"`
}

 

client/client.go

// Package main
package main

import (
	"bufio"
	"bytes"
	"encoding/binary"
	"encoding/json"
	"fmt"
	"net"
	"sync"
	"time"
)

// Read
func (c *Client) Read() {
	for {
		readByteSlice := make([]byte, 2048, 2048)
		n, err := c.readerBuf.Read(readByteSlice)
		if err != nil {
			break
		}

		fmt.Println("read Size:", n)
	}

	c.Close0()
}

// Write
func (c *Client) Write() {
	for {
		packet, ok := <-c.sendStream
		if !ok {
			fmt.Println("Write channel disabled")
			break
		}

		_, err := c.writerBuf.Write(packet)
		if err != nil {
			fmt.Println(err.Error())
			break
		}
		c.writerBuf.Flush()
	}

	c.Close0()
}

// LoopWrite
func (c *Client) LoopWrite(wg *sync.WaitGroup) {
	for i := 0; i < 1000; i++ {
		time.Sleep(time.Millisecond * 100)

		message := Message{}
		message.Timestamp = time.Now().UnixNano() / 100 // 17자리
		message.Text = "hello"
		jsonSlice, _ := json.Marshal(&message)

		startingPoint, bodySize := byte(0x07), len(jsonSlice)
		packet := bytes.NewBuffer(make([]byte, 0, 5+bodySize))
		binary.Write(packet, binary.BigEndian, startingPoint)    // header startingPoint
		binary.Write(packet, binary.BigEndian, uint32(bodySize)) // header bodySize
		binary.Write(packet, binary.BigEndian, jsonSlice)        // body json

		c.sendStream <- packet.Bytes()
		// fmt.Println("write", c.socketId)
	}

	wg.Done()
}

// Close
func (c *Client) Close0() {
	c.onceClose.Do(func() {
		c.conn.Close()      // tcp socket close
		close(c.sendStream) // go channel close
	})
}

// Client
type Client struct {
	socketId   int
	conn       net.Conn
	readerBuf  *bufio.Reader
	writerBuf  *bufio.Writer
	sendStream chan []byte // Buffered Go Channel
	onceClose  sync.Once
}

 

golang 1.18, 1.19 버전에 제너릭이 추가되어 성능 향상이 있었다.

빌드를 1.21 버전으로 해도 정상 동작한다.

 

서버 빌드

더보기

// windows
set GOROOT=C:\tools\go\1.21.12
C:\tools\go\1.21.12\bin\go.exe mod init my.project/tcpsocket
C:\tools\go\1.21.12\bin\go.exe build -v -o server.exe

 

클라이언트 빌드

더보기

// macOS, linux
export GOROOT=/tools/go/1.21.12
/tools/go/1.21.12/bin/go mod init my.project/tcpsocket
/tools/go/1.21.12/bin/go build -v -o server

 

'golang' 카테고리의 다른 글

golang UTC 타임존 변경  (0) 2024.07.08
golang uuid  (0) 2024.07.08
golang byte slice pointer  (0) 2024.07.05
golang 함수 문자열 포인터 파라미터  (0) 2024.07.05
golang init 함수 호출 시점  (0) 2024.07.05