golang 서버 구성시 저 사양 서버 머신에서 10k ~ 15k 동시 접속자 처리도 가능하다.
수신 패킷 단편화 처리, 접속 클라이언트 관리, 파일 로그 등등 추가 기능을 포함하면 실무에서도 사용 가능하다.
개발툴은 vscode를 사용했다.
프로젝트 디렉터리 구조
tcp_socket_server.code-workspace
server
- main.go
- client.go
- channelGroup.go
- go.mod
client
- main.go
- client.go
- go.mod
tcp_socket_server.code-workspace
{
"folders": [
{
"path": "./server"
},
{
"path": "./client"
}
],
"settings": {
"workbench.list.openMode": "doubleClick",
"editor.minimap.enabled": false,
"[go]": {
"editor.formatOnSave": false
},
"go.useLanguageServer": true,
"go.buildOnSave": "off",
"go.lintOnSave": "off",
"go.vetOnSave": "off",
"go.buildFlags": [],
"go.vetFlags": [],
"go.useCodeSnippetsOnFunctionSuggest": false,
"go.formatTool": "goreturns",
"go.lintTool": "gometalinter",
"go.lintFlags": ["--disable=all"],
"go.formatFlags": [
"--disable=all"
]
}
}
server/main.go
// Package main
package main
import (
"bufio"
"fmt"
"net"
"runtime"
"sync/atomic"
)
// init
func init() {
fmt.Println("init")
runtime.GOMAXPROCS(runtime.NumCPU()) // CPU 코어 개수 만큼 프로세스 설정
}
// main
func main() {
// TCP Server Listening
tcpAddr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:8080")
if err != nil {
fmt.Printf("%s\n", err.Error())
return
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
fmt.Printf("%s\n", err.Error())
return
}
defer listener.Close()
// NewChannelGroup
channelGroup := NewChannelGroup()
go channelGroup.RunBroadcastGoChannel()
fmt.Println("Tcp Socket Server Startup")
var number int32 = 0 // socket id
for {
conn, err := listener.Accept()
if err != nil {
fmt.Printf("Accept error: %s\n", err.Error())
continue // next
}
nextNumber := atomic.AddInt32(&number, 1)
socketId := int(nextNumber)
fmt.Printf("socketId: %d, remoteAddr: %s\n", socketId, conn.RemoteAddr().String())
client := &Client{
socketId: socketId,
conn: conn,
readerBuf: bufio.NewReaderSize(conn, 2048),
writerBuf: bufio.NewWriterSize(conn, 8192),
sendStream: make(chan []byte, 21),
channelGroup: channelGroup,
}
go client.Write()
go client.Read()
channelGroup.clientBroadcastMapMutex.Lock()
channelGroup.clientBroadcastMap[socketId] = client
channelGroup.clientBroadcastMapMutex.Unlock()
}
}
server/client.go
// Package main
package main
import (
"bufio"
//"bytes"
"fmt"
"io"
"net"
"strings"
"sync"
)
// Read
func (c *Client) Read() {
for {
readByteSlice := make([]byte, 2048, 2048)
n, err := c.readerBuf.Read(readByteSlice) // 패킷 읽기
if err != nil {
if io.EOF == err {
fmt.Println("Read close ", c.socketId)
} else {
if strings.HasSuffix(err.Error(), "i/o timeout") {
fmt.Println("Read close i/o timeout ", c.socketId)
} else {
fmt.Println("Read error ", c.socketId, err.Error())
}
}
break // 고루틴 종료
}
packet := readByteSlice[:n]
c.channelGroup.broadcastStream <- packet
}
c.Close0()
}
// Write
func (c *Client) Write() {
for {
packet, ok := <-c.sendStream // server -> client
if !ok { // socket close
fmt.Println("Write channel disabled", c.socketId)
break // 고루틴 종료
}
// fmt.Printf("%d\n", len(packet))
_, err := c.writerBuf.Write(packet)
if err != nil {
fmt.Printf("Write buffer write error: %s\n", err.Error())
break // 고루틴 종료
}
if err = c.writerBuf.Flush(); err != nil {
fmt.Printf("Write buffer flush error: %s\n", err.Error())
break // 고루틴 종료
}
}
c.Close0()
}
// Close0
func (c *Client) Close0() {
c.onceClose.Do(func() {
c.channelGroup.clientBroadcastMapMutex.Lock()
delete(c.channelGroup.clientBroadcastMap, c.socketId) // map element delete
c.channelGroup.clientBroadcastMapMutex.Unlock()
c.conn.Close() // tcp socket close
close(c.sendStream) // go channel close
fmt.Println("Close", c.socketId)
})
}
// Client
type Client struct {
socketId int
conn net.Conn
readerBuf *bufio.Reader
writerBuf *bufio.Writer
sendStream chan []byte
channelGroup *ChannelGroup
onceClose sync.Once
}
server/channelGroup.go
// Package main
package main
import (
"fmt"
"sync"
"time"
)
// NewChannelGroup
func NewChannelGroup() *ChannelGroup {
return &ChannelGroup{
broadcastStream: make(chan []byte, 21), // Buffered Go Channel
clientBroadcastMap: make(map[int]*Client, 15000), // socketId key
}
}
// RunBroadcastGoChannel for loop
func (c *ChannelGroup) RunBroadcastGoChannel() {
for {
packet, ok := <-c.broadcastStream
if !ok {
break
}
// fmt.Println("Broadcast", len(packet))
startTime := time.Now().UnixNano() / 1000000
c.clientBroadcastMapMutex.Lock()
for _, client := range c.clientBroadcastMap {
client.sendStream <- packet
}
c.clientBroadcastMapMutex.Unlock()
elapsedTime := (time.Now().UnixNano() / 1000000) - startTime
if elapsedTime > 10 {
fmt.Println("elapsedTime:", elapsedTime)
}
}
fmt.Println("Broadcast go channel exit")
}
// ChannelGroup
type ChannelGroup struct {
broadcastStream chan []byte // broadcast Go Channel
clientBroadcastMap map[int]*Client // broadcast 클라이언트 소켓 Map
clientBroadcastMapMutex sync.Mutex
}
client/main.go
// Package main
package main
import (
"bufio"
"fmt"
"net"
"runtime"
"sync"
"sync/atomic"
"time"
)
// init
func init() {
fmt.Println("init")
runtime.GOMAXPROCS(runtime.NumCPU()) // CPU 코어 개수 만큼 프로세스 설정
}
// main
func main() {
var wg sync.WaitGroup
connectAddr := "127.0.0.1:8080"
var number int32 = 0
for i := 0; i < 100; i++ {
counter := atomic.AddInt32(&number, 1)
socketId := int(counter)
wg.Add(1)
if conn, err := net.DialTimeout("tcp", connectAddr, time.Second*10); err == nil {
fmt.Println(conn.RemoteAddr().String())
client := &Client{
socketId: socketId,
conn: conn,
readerBuf: bufio.NewReaderSize(conn, 8192),
writerBuf: bufio.NewWriterSize(conn, 2048),
sendStream: make(chan []byte, 21),
}
go client.Write()
go client.Read()
time.Sleep(time.Second * 1)
go client.LoopWrite(&wg)
} else {
fmt.Println("conn error")
wg.Done()
}
}
wg.Wait()
fmt.Println("complete")
}
// Message
type Message struct {
Timestamp int64 `json:"timestamp"`
Text string `json:"text"`
}
client/client.go
// Package main
package main
import (
"bufio"
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"net"
"sync"
"time"
)
// Read
func (c *Client) Read() {
for {
readByteSlice := make([]byte, 2048, 2048)
n, err := c.readerBuf.Read(readByteSlice)
if err != nil {
break
}
fmt.Println("read Size:", n)
}
c.Close0()
}
// Write
func (c *Client) Write() {
for {
packet, ok := <-c.sendStream
if !ok {
fmt.Println("Write channel disabled")
break
}
_, err := c.writerBuf.Write(packet)
if err != nil {
fmt.Println(err.Error())
break
}
c.writerBuf.Flush()
}
c.Close0()
}
// LoopWrite
func (c *Client) LoopWrite(wg *sync.WaitGroup) {
for i := 0; i < 1000; i++ {
time.Sleep(time.Millisecond * 100)
message := Message{}
message.Timestamp = time.Now().UnixNano() / 100 // 17자리
message.Text = "hello"
jsonSlice, _ := json.Marshal(&message)
startingPoint, bodySize := byte(0x07), len(jsonSlice)
packet := bytes.NewBuffer(make([]byte, 0, 5+bodySize))
binary.Write(packet, binary.BigEndian, startingPoint) // header startingPoint
binary.Write(packet, binary.BigEndian, uint32(bodySize)) // header bodySize
binary.Write(packet, binary.BigEndian, jsonSlice) // body json
c.sendStream <- packet.Bytes()
// fmt.Println("write", c.socketId)
}
wg.Done()
}
// Close
func (c *Client) Close0() {
c.onceClose.Do(func() {
c.conn.Close() // tcp socket close
close(c.sendStream) // go channel close
})
}
// Client
type Client struct {
socketId int
conn net.Conn
readerBuf *bufio.Reader
writerBuf *bufio.Writer
sendStream chan []byte // Buffered Go Channel
onceClose sync.Once
}
golang 1.18, 1.19 버전에 제너릭이 추가되어 성능 향상이 있었다.
빌드를 1.21 버전으로 해도 정상 동작한다.
서버 빌드
// windows
set GOROOT=C:\tools\go\1.21.12
C:\tools\go\1.21.12\bin\go.exe mod init my.project/tcpsocket
C:\tools\go\1.21.12\bin\go.exe build -v -o server.exe
클라이언트 빌드
// macOS, linux
export GOROOT=/tools/go/1.21.12
/tools/go/1.21.12/bin/go mod init my.project/tcpsocket
/tools/go/1.21.12/bin/go build -v -o server
'golang' 카테고리의 다른 글
golang UTC 타임존 변경 (0) | 2024.07.08 |
---|---|
golang uuid (0) | 2024.07.08 |
golang byte slice pointer (0) | 2024.07.05 |
golang 함수 문자열 포인터 파라미터 (0) | 2024.07.05 |
golang init 함수 호출 시점 (0) | 2024.07.05 |