import collections
import math
import random
import re
import shlex
import textwrap
from itertools import product
from typing import Dict, Optional, Sequence, Tuple, Union
import enet
from twisted.internet import reactor
from twisted.logger import Logger
from pyspades import contained as loaders
from pyspades import world
from pyspades.collision import collision_3d, vector_collision
from pyspades.common import Vertex3, get_color, make_color
from pyspades.constants import *
from pyspades.constants import (BLOCK_TOOL, CTF_MODE, ERROR_FULL,
ERROR_TOO_MANY_CONNECTIONS,
ERROR_WRONG_VERSION, FALL_KILL, HEAD,
HEADSHOT_KILL, HIT_TOLERANCE,
FOG_DISTANCE, MAX_BLOCK_DISTANCE, MAX_POSITION_RATE,
MELEE, MELEE_DISTANCE, MELEE_KILL,
RAPID_WINDOW_ENTRIES, SPADE_TOOL,
TC_CAPTURE_DISTANCE, TC_MODE, WEAPON_KILL,
WEAPON_TOOL)
from pyspades.mapgenerator import ProgressiveMapGenerator
from pyspades.packet import call_packet_handler, register_packet_handler
from pyspades.protocol import BaseConnection
from pyspades.team import Team
from pyspades.weapon import WEAPONS
from pyspades.types import RateLimiter
log = Logger()
tc_data = loaders.TCState()
[docs]def check_nan(*values) -> bool:
for value in values:
if math.isnan(value) or math.isinf(value):
return True
return False
[docs]def parse_command(value: str) -> Tuple[str, Sequence[str]]:
try:
splitted = shlex.split(value)
except ValueError:
# shlex failed. let's just split per space
splitted = value.split(' ')
if splitted:
command = splitted.pop(0)
else:
command = ''
return command, splitted
[docs]class ServerConnection(BaseConnection):
address = None # Tuple[int, int]
player_id = None
map_packets_sent = 0
team = None # type: Team
weapon = None
weapon_object = None
name = None
kills = 0
hp = None
tool = None
color = (0x70, 0x70, 0x70)
grenades = None
blocks = None
spawn_call = None
respawn_time = None
saved_loaders = None
last_refill = None
last_block_destroy = None
filter_visibility_data = False
filter_animation_data = False
freeze_animation = False
filter_weapon_input = False
speedhack_detect = False
rubberband_distance = 10
rapid_hack_detect = False
timers = None
world_object = None # type: world.Character
last_block = None
map_data = None
last_position_update = None
local = False
def __init__(self, *arg, **kw) -> None:
BaseConnection.__init__(self, *arg, **kw)
protocol = self.protocol
address = self.peer.address
self.total_blocks_removed = 0
self.address = (address.host, address.port)
self.respawn_time = protocol.respawn_time
self.rapids = RateLimiter(RAPID_WINDOW_ENTRIES, MAX_RAPID_SPEED)
self.client_info = {}
self.proto_extensions = {} # type: Dict[int, int]
self.line_build_start_pos = None
[docs] def on_connect(self) -> None:
if self.local:
return
if self.peer.eventData != self.protocol.version:
log.debug("{player} kicked: wrong protocol version {version}",
player=self, version=self.peer.eventData)
self.disconnect(ERROR_WRONG_VERSION)
return
max_players = min(32, self.protocol.max_players)
if len(self.protocol.connections) > max_players:
self.disconnect(ERROR_FULL)
return
if self.protocol.max_connections_per_ip:
shared = [conn for conn in
self.protocol.connections.values()
if conn.address[0] == self.address[0]]
if len(shared) > self.protocol.max_connections_per_ip:
self.disconnect(ERROR_TOO_MANY_CONNECTIONS)
return
if not self.disconnected:
log.debug("sending map data to {player}", player=self)
self._connection_ack()
[docs] def loader_received(self, loader: enet.Packet) -> None:
"""
called when a loader i.e. packet is received.
calls the packet handler registered with
@register_packet_handler
"""
if self.player_id is None:
return
call_packet_handler(self, loader)
[docs] @register_packet_handler(loaders.ProtocolExtensionInfo)
def on_ext_info_received(self, contained: loaders.ProtocolExtensionInfo) -> None:
self.proto_extensions = dict(contained.extensions)
log.debug("received extinfo {extinfo} from {player}",
extinfo=self.proto_extensions,
player=self)
[docs] @register_packet_handler(loaders.ExistingPlayer)
@register_packet_handler(loaders.ShortPlayerData)
def on_new_player_recieved(self, contained: loaders.ExistingPlayer) -> None:
if self.team is not None and not self.team.spectator:
# This player has already joined the game as a full player.
# Existingplayer may only be sent if in the limbo or spectator
# modes. Without this check, they could respawn themselves
# instantly on any team they wanted.
log.debug("{} tried sending an ExistingPlayer packet while not in"
" limbo or spectator mode".format(self))
return
old_team = self.team
team = self.protocol.teams[contained.team]
log.debug("{user} wants to join {team}",
user=self, team=team, teamid=contained.team)
ret = self.on_team_join(team)
if ret is False:
team = self.protocol.team_spectator
elif ret is not None:
team = ret
self.team = team
if self.name is None:
name = contained.name
self.name = self.protocol.get_name(name)
self.protocol.players[self.player_id] = self
self.on_login(self.name)
else:
self.on_team_changed(old_team)
self.set_weapon(contained.weapon, True)
if self.protocol.speedhack_detect and not self.local:
self.speedhack_detect = True
if self.protocol.rubberband_distance is not None:
self.rubberband_distance = self.protocol.rubberband_distance
if not self.local:
self.rapid_hack_detect = True
if team.spectator:
if self.world_object is not None:
self.world_object.delete()
self.world_object = None
# send kill packets for dead players
for player in self.protocol.players.values():
if (player.player_id != self.player_id and player.world_object
and player.world_object.dead):
kill_action = loaders.KillAction()
kill_action.killer_id = player.player_id
kill_action.player_id = player.player_id
kill_action.kill_type = FALL_KILL
self.send_contained(kill_action)
self.spawn()
[docs] @register_packet_handler(loaders.OrientationData)
def on_orientation_update_recieved(self, contained: loaders.OrientationData) -> None:
if not self.hp:
return
x, y, z = contained.x, contained.y, contained.z
if check_nan(x, y, z):
self.on_hack_attempt(
'Invalid orientation data received')
return
returned = self.on_orientation_update(x, y, z)
if returned == False:
return
if returned is not None:
x, y, z = returned
self.world_object.set_orientation(x, y, z)
[docs] @register_packet_handler(loaders.PositionData)
def on_position_update_recieved(self, contained: loaders.PositionData) -> None:
if not self.hp:
return
current_time = reactor.seconds()
last_update = self.last_position_update
self.last_position_update = current_time
if last_update is not None:
dt = current_time - last_update
if dt < MAX_POSITION_RATE:
self.set_location()
return
x, y, z = contained.x, contained.y, contained.z
if check_nan(x, y, z):
self.on_hack_attempt(
'Invalid position data received')
return
if not self.check_speedhack(x, y, z):
# vanilla behaviour
self.set_location()
return
if not self.freeze_animation:
self.world_object.set_position(x, y, z)
self.on_position_update()
if self.filter_visibility_data:
return
game_mode = self.protocol.game_mode
if game_mode == CTF_MODE:
other_flag = self.team.other.flag
if vector_collision(self.world_object.position,
self.team.base):
if other_flag.player is self:
self.capture_flag()
self.check_refill()
if other_flag.player is None and vector_collision(
self.world_object.position, other_flag):
self.take_flag()
elif game_mode == TC_MODE:
for entity in self.protocol.entities:
collides = vector_collision(
entity, self.world_object.position, TC_CAPTURE_DISTANCE)
if self in entity.players:
if not collides:
entity.remove_player(self)
else:
if collides:
entity.add_player(self)
if collides and vector_collision(entity,
self.world_object.position):
self.check_refill()
[docs] @register_packet_handler(loaders.WeaponReload)
def on_reload_recieved(self, contained) -> None:
if not self.hp:
return
self.weapon_object.reload()
if self.filter_animation_data:
return
contained.player_id = self.player_id
self.protocol.broadcast_contained(contained, sender=self)
[docs] @register_packet_handler(loaders.HitPacket)
def on_hit_recieved(self, contained):
world_object = self.world_object
value = contained.value
is_melee = value == MELEE
if is_melee:
kill_type = MELEE_KILL
elif contained.value == HEAD:
kill_type = HEADSHOT_KILL
else:
kill_type = WEAPON_KILL
try:
player = self.protocol.players[contained.player_id]
except KeyError:
return
position1 = world_object.position
position2 = player.world_object.position
if is_melee:
hit_amount = self.protocol.melee_damage
else:
hit_amount = self.weapon_object.get_damage(
value, position1, position2)
self.on_unvalidated_hit(hit_amount, player, kill_type, None)
if not self.hp:
return
if not is_melee and self.weapon_object.is_empty():
return
valid_hit = world_object.validate_hit(player.world_object,
value, HIT_TOLERANCE,
self.rubberband_distance)
if not valid_hit:
return
if is_melee and not vector_collision(position1, position2,
MELEE_DISTANCE):
return
returned = self.on_hit(hit_amount, player, kill_type, None)
if returned == False:
return
elif returned is not None:
hit_amount = returned
player.hit(hit_amount, self, kill_type)
[docs] @register_packet_handler(loaders.GrenadePacket)
def on_grenade_recieved(self, contained: loaders.GrenadePacket) -> None:
if not self.hp:
return
if check_nan(contained.value, *contained.position) or check_nan(*contained.velocity):
self.on_hack_attempt("Invalid grenade data")
return
if not self.grenades:
return
self.grenades -= 1
if not self.check_speedhack(*contained.position):
contained.position = self.world_object.position.get()
if contained.value > 3.0:
contained.value = 3.0
velocity = Vertex3(*contained.velocity) - self.world_object.velocity
if velocity.length() > 2.0: # cap at tested maximum
velocity = velocity.normal() * 2.0
velocity += self.world_object.velocity
if self.on_grenade(contained.value) == False:
return
grenade = self.protocol.world.create_object(
world.Grenade, contained.value,
Vertex3(*contained.position), None,
velocity, self.grenade_exploded)
grenade.team = self.team
log.debug("{player!r} ({world_object!r}) created {grenade!r}",
grenade=grenade, world_object=self.world_object, player=self)
self.on_grenade_thrown(grenade)
if self.filter_visibility_data:
return
contained.player_id = self.player_id
self.protocol.broadcast_contained(contained, sender=self)
[docs] @register_packet_handler(loaders.SetColor)
def on_color_change_recieved(self, contained: loaders.SetColor) -> None:
if not self.hp:
return
color = get_color(contained.value)
if self.on_color_set_attempt(color) == False:
return
self.color = color
self.on_color_set(color)
if self.filter_animation_data:
return
contained.player_id = self.player_id
self.protocol.broadcast_contained(contained, sender=self, save=True)
[docs] @register_packet_handler(loaders.BlockAction)
def on_block_action_recieved(self, contained: loaders.BlockAction) -> None:
world_object = self.world_object
if not self.hp:
return
value = contained.value
if value == GRENADE_DESTROY:
return
if value == BUILD_BLOCK:
interval = TOOL_INTERVAL[BLOCK_TOOL]
elif self.tool == WEAPON_TOOL:
if self.weapon_object.is_empty():
return
interval = WEAPON_INTERVAL[self.weapon]
else:
interval = TOOL_INTERVAL[self.tool]
current_time = reactor.seconds()
last_time = self.last_block
self.last_block = current_time
if (self.rapid_hack_detect and last_time is not None and
current_time - last_time < interval):
self.rapids.record_event(current_time)
if self.rapids.above_limit():
log.info('RAPID HACK: {events}',
events=self.rapids.get_events())
self.on_hack_attempt('Rapid hack detected')
return
map_ = self.protocol.map
x = contained.x
y = contained.y
z = contained.z
if z >= 62:
return
if value == BUILD_BLOCK:
self.blocks -= 1
pos = world_object.position
if self.blocks < -BUILD_TOLERANCE:
return
elif not collision_3d(pos.x, pos.y, pos.z, x, y, z,
MAX_BLOCK_DISTANCE):
return
elif self.on_block_build_attempt(x, y, z) == False:
return
elif not map_.build_point(x, y, z, self.color):
return
self.on_block_build(x, y, z)
else:
if not map_.get_solid(x, y, z):
return
pos = world_object.position
if self.tool == SPADE_TOOL and not collision_3d(
pos.x, pos.y, pos.z, x, y, z, MAX_DIG_DISTANCE):
return
if self.on_block_destroy(x, y, z, value) == False:
return
elif value == DESTROY_BLOCK:
count = map_.destroy_point(x, y, z)
if count:
self.total_blocks_removed += count
self.blocks = min(50, self.blocks + 1)
self.on_block_removed(x, y, z)
elif value == SPADE_DESTROY:
for xyz in ((x, y, z), (x, y, z + 1), (x, y, z - 1)):
count = map_.destroy_point(*xyz)
if count:
self.total_blocks_removed += count
self.on_block_removed(*xyz)
self.last_block_destroy = reactor.seconds()
block_action = loaders.BlockAction()
block_action.x = x
block_action.y = y
block_action.z = z
block_action.value = contained.value
block_action.player_id = self.player_id
self.protocol.broadcast_contained(block_action, save=True)
self.protocol.update_entities()
[docs] @register_packet_handler(loaders.BlockLine)
def on_block_line_recieved(self, contained):
if not self.hp:
return # dead players can't build
if self.line_build_start_pos is None:
return
current_time = reactor.seconds()
last_time = self.last_block
self.last_block = current_time
if (self.rapid_hack_detect and last_time is not None and
current_time - last_time < TOOL_INTERVAL[BLOCK_TOOL]):
self.rapids.record_event(current_time)
if self.rapids.above_limit():
log.info('RAPID HACK: {events}',
events=self.rapids.get_events())
self.on_hack_attempt('Rapid hack detected')
return
map_ = self.protocol.map
x1, y1, z1 = (contained.x1, contained.y1, contained.z1)
x2, y2, z2 = (contained.x2, contained.y2, contained.z2)
pos = self.world_object.position
start_pos = self.line_build_start_pos
if (not map_.is_valid_position(x1, y1, z1)
or not map_.is_valid_position(x2, y2, z2)):
return # coordinates are out of bounds
# ensure that the player is currently within tolerance of the location
# that the line build ended at
if not collision_3d(pos.x, pos.y, pos.z, x2, y2, z2,
MAX_BLOCK_DISTANCE):
return
# ensure that the player was within tolerance of the location
# that the line build started at
if not collision_3d(start_pos.x, start_pos.y, start_pos.z, x1, y1, z1,
MAX_BLOCK_DISTANCE):
return
# check if block can be placed in that location
if not map_.has_neighbors(x1, y1, z1):
return
if not map_.has_neighbors(x2, y2, z2):
return
points = world.cube_line(x1, y1, z1, x2, y2, z2)
if not points:
return
if len(points) > (self.blocks + BUILD_TOLERANCE):
return
if self.on_line_build_attempt(points) is False:
return
for point in points:
x, y, z = point
if map_.get_solid(x, y, z):
continue
if not map_.build_point(x, y, z, self.color):
break
self.blocks -= len(points)
self.on_line_build(points)
contained.player_id = self.player_id
self.protocol.broadcast_contained(contained, save=True)
self.protocol.update_entities()
[docs] @register_packet_handler(loaders.ChatMessage)
def on_chat_message_recieved(self, contained: loaders.ChatMessage) -> None:
if not self.name:
return
value = contained.value
if len(value) > 108:
log.info("TOO LONG MESSAGE (%i chars) FROM %s (#%i)" %
(len(value), self.name, self.player_id))
value = value[:108]
if value.startswith('/'):
self.on_command(*parse_command(value[1:]))
else:
global_message = contained.chat_type == CHAT_ALL
result = self.on_chat(value, global_message)
if result == False:
return
elif result is not None:
value = result
contained.chat_type = CHAT_ALL if global_message else CHAT_TEAM
contained.value = value
contained.player_id = self.player_id
if global_message:
team = None
else:
team = self.team
for player in self.protocol.players.values():
if not player.deaf:
if team is None or team is player.team:
player.send_contained(contained)
self.on_chat_sent(value, global_message)
[docs] @register_packet_handler(loaders.FogColor)
def on_fog_color_recieved(self, contained):
# FIXME: this theoretically might anyone to set the fog...
# do we even need this?
if not self.name:
return
color = get_color(contained.color)
self.on_command('fog', [str(item) for item in color])
[docs] @register_packet_handler(loaders.ChangeWeapon)
def on_weapon_change_recieved(self, contained):
if not self.name:
return
if self.on_weapon_set(contained.weapon) == False:
return
self.weapon = contained.weapon
self.set_weapon(self.weapon)
[docs] @register_packet_handler(loaders.ChangeTeam)
def on_team_change_recieved(self, contained):
if not self.name:
return
team = self.protocol.teams[contained.team]
ret = self.on_team_join(team)
if ret is False:
return
team = ret or team
self.set_team(team)
[docs] @register_packet_handler(loaders.HandShakeReturn)
def on_handshake_recieved(self, contained: loaders.HandShakeReturn) -> None:
version_request = loaders.VersionRequest()
self.send_contained(version_request)
[docs] @register_packet_handler(loaders.VersionResponse)
def on_version_info_recieved(self, contained: loaders.VersionResponse) -> None:
self.client_info["version"] = contained.version
self.client_info["os_info"] = contained.os_info[:108]
# TODO: Make this a dict lookup instead
if contained.client == 'o':
self.client_info["client"] = "OpenSpades"
elif contained.client == 'B':
self.client_info["client"] = "BetterSpades"
# BetterSpades currently sends the client name in the OS info to
# deal with old scripts that don't recognize the 'B' indentifier
match = re.match(r"\ABetterSpades \((.*)\)\Z",
contained.os_info[:108])
if match:
self.client_info["os_info"] = match.groups()[0]
elif contained.client == 'a':
self.client_info["client"] = "ACE"
else:
self.client_info["client"] = "Unknown({})".format(contained.client)
# send extension info to clients that support this packet.
# skip openspades <= 0.1.3 https://github.com/piqueserver/piqueserver/issues/504
if contained.client == 'o' and contained.version <= (0, 1, 3):
log.debug("not sending version request to OpenSpades <= 0.1.3")
else:
ext_info = loaders.ProtocolExtensionInfo()
ext_info.extensions = []
self.send_contained(ext_info)
@property
def client_string(self):
client = self.client_info.get("client", "Unknown")
os = self.client_info.get("os_info", "Unknown")
version = self.client_info.get("version", None)
version_string = "Unknown" if version is None else ".".join(
map(str, version))
if client == os == version_string == "Unknown":
client = "Probably Voxlap"
os = "Windows"
version_string = "0.75"
return "{} v{} on {}".format(client, version_string, os)
[docs] def check_speedhack(self, x: float, y: float, z: float, distance: None = None) -> bool:
if not self.speedhack_detect:
return True
if distance is None:
distance = self.rubberband_distance
position = self.world_object.position
return (math.fabs(x - position.x) < distance and
math.fabs(y - position.y) < distance and
math.fabs(z - position.z) < distance)
# backwards compatability
is_valid_position = check_speedhack
[docs] def check_refill(self):
last_refill = self.last_refill
if (last_refill is None or
reactor.seconds() - last_refill > self.protocol.refill_interval):
self.last_refill = reactor.seconds()
if self.on_refill() != False:
self.refill()
[docs] def get_location(self):
position = self.world_object.position
return position.x, position.y, position.z
[docs] def is_location_free(self, x, y, z):
return (self.protocol.map.get_solid(x, y, z) == 0 and
self.protocol.map.get_solid(x, y, z + 1) == 0 and
self.protocol.map.get_solid(x, y, z + 2) == 0 and
self.protocol.map.get_solid(x, y, z + 3) == 1)
[docs] def set_location_safe(self, location, center=True):
x, y, z = location
if center:
x -= 0.5
y -= 0.5
z += 0.5
x = int(x)
y = int(y)
z = int(z)
# search for valid locations near the specified point
for pos in self.protocol.pos_table:
if self.is_location_free(x + pos[0], y + pos[1], z + pos[2]):
self.set_location((x + pos[0], y + pos[1], z + pos[2]))
return True
return False
[docs] def set_location(self, location=None):
if location is None:
# used for rubberbanding
position = self.world_object.position
x, y, z = position.x, position.y, position.z
else:
x, y, z = location
x += 0.5
y += 0.5
z -= 0.5
if self.world_object is not None:
self.world_object.set_position(x, y, z)
position_data = loaders.PositionData()
position_data.x = x
position_data.y = y
position_data.z = z
self.send_contained(position_data)
[docs] def refill(self, local: bool = False) -> None:
self.hp = 100
self.grenades = 3
self.blocks = 50
self.weapon_object.restock()
if not local:
restock = loaders.Restock()
self.send_contained(restock)
[docs] def respawn(self) -> None:
if self.spawn_call is None:
self.spawn_call = reactor.callLater(
self.get_respawn_time(), self.spawn)
[docs] def get_spawn_location(self) -> Tuple[int, int, int]:
game_mode = self.protocol.game_mode
if game_mode == TC_MODE:
try:
base = random.choice(list(self.team.get_entities()))
return base.get_spawn_location()
except IndexError:
pass
return self.team.get_random_location(True)
[docs] def get_respawn_time(self) -> float:
if not self.respawn_time:
return 0
if self.protocol.respawn_waves:
offset = reactor.seconds() % self.respawn_time
else:
offset = 0
return self.respawn_time - offset
[docs] def spawn(self, pos: None = None) -> None:
self.spawn_call = None
if self.team is None:
return
spectator = self.team.spectator
create_player = loaders.CreatePlayer()
if not spectator:
if pos is None:
x, y, z = self.get_spawn_location()
x += 0.5
y += 0.5
z -= 2.4
else:
x, y, z = pos
returned = self.on_spawn_location((x, y, z))
if returned is not None:
x, y, z = returned
if self.world_object is not None:
self.world_object.set_position(x, y, z, True)
else:
position = Vertex3(x, y, z)
self.world_object = self.protocol.world.create_object(
world.Character, position, None, self._on_fall)
self.world_object.dead = False
self.tool = WEAPON_TOOL
self.refill(True)
create_player.x = x
create_player.y = y
create_player.z = z
create_player.weapon = self.weapon
create_player.player_id = self.player_id
create_player.name = self.name
create_player.team = self.team.id
if self.filter_visibility_data and not spectator:
self.send_contained(create_player)
else:
self.protocol.broadcast_contained(create_player, save=True)
if not spectator:
self.on_spawn((x, y, z))
if not self.client_info:
handshake_init = loaders.HandShakeInit()
self.send_contained(handshake_init)
[docs] def take_flag(self):
if not self.hp:
return
flag = self.team.other.flag
if flag.player is not None:
return
if self.on_flag_take() == False:
return
flag.player = self
intel_pickup = loaders.IntelPickup()
intel_pickup.player_id = self.player_id
self.protocol.broadcast_contained(intel_pickup, save=True)
[docs] def capture_flag(self):
other_team = self.team.other
flag = other_team.flag
player = flag.player
if player is not self:
return
self.add_score(10) # 10 points for intel
self.team.score += 1
self.on_flag_capture()
if (self.protocol.max_score not in (0, None) and
self.team.score >= self.protocol.max_score):
self.protocol.reset_game(self)
self.protocol.on_game_end()
else:
intel_capture = loaders.IntelCapture()
intel_capture.player_id = self.player_id
intel_capture.winning = False
self.protocol.broadcast_contained(intel_capture, save=True)
flag = other_team.set_flag()
flag.update()
[docs] def drop_flag(self) -> None:
protocol = self.protocol
game_mode = protocol.game_mode
if game_mode == CTF_MODE:
for flag in (protocol.blue_team.flag, protocol.green_team.flag):
player = flag.player
if player is not self:
continue
position = self.world_object.position
# convert to safe coords so the flag can't be dropped out of bounds
x, y, z = self.protocol.map.get_safe_coords(
position.x, position.y, position.z)
# or inside solid
z = self.protocol.map.get_z(x, y, z)
flag.set(x, y, z)
flag.player = None
intel_drop = loaders.IntelDrop()
intel_drop.player_id = self.player_id
intel_drop.x = flag.x
intel_drop.y = flag.y
intel_drop.z = flag.z
self.protocol.broadcast_contained(intel_drop, save=True)
self.on_flag_drop()
break
elif game_mode == TC_MODE:
for entity in protocol.entities:
if self in entity.players:
entity.remove_player(self)
[docs] def on_disconnect(self) -> None:
if self.name is not None:
self.drop_flag()
player_left = loaders.PlayerLeft()
player_left.player_id = self.player_id
self.protocol.broadcast_contained(player_left, sender=self,
save=True)
del self.protocol.players[self.player_id]
if self.player_id is not None:
self.protocol.player_ids.put_back(self.player_id)
self.protocol.update_master()
self.reset()
[docs] def reset(self) -> None:
if self.spawn_call is not None:
self.spawn_call.cancel()
self.spawn_call = None
if self.world_object is not None:
self.world_object.delete()
self.world_object = None
if self.team is not None:
old_team = self.team
self.team = None
self.on_team_changed(old_team)
self.on_reset()
self.name = self.hp = self.world_object = None
[docs] def hit(self, value, by=None, kill_type=WEAPON_KILL):
if self.hp is None:
return
if by is not None and self.team is by.team:
friendly_fire = self.protocol.friendly_fire
friendly_fire_on_grief = self.protocol.friendly_fire_on_grief
if friendly_fire_on_grief and not friendly_fire:
if (kill_type == MELEE_KILL and
not self.protocol.spade_teamkills_on_grief):
return
hit_time = self.protocol.friendly_fire_time
if (self.last_block_destroy is None
or reactor.seconds() - self.last_block_destroy >= hit_time):
return
if not friendly_fire:
return
self.set_hp(self.hp - value, by, kill_type=kill_type)
[docs] def set_hp(self, value: Union[int, float], hit_by: Optional['ServerConnection'] = None, kill_type: int = WEAPON_KILL,
hit_indicator: Optional[Tuple[float, float, float]] = None, grenade: Optional[world.Grenade] = None) -> None:
value = int(value)
self.hp = max(0, min(100, value))
if self.hp <= 0:
self.kill(hit_by, kill_type, grenade)
return
set_hp = loaders.SetHP()
set_hp.hp = self.hp
set_hp.not_fall = int(kill_type != FALL_KILL)
if hit_indicator is None:
if hit_by is not None and hit_by is not self:
hit_indicator = hit_by.world_object.position.get()
else:
hit_indicator = (0, 0, 0)
x, y, z = hit_indicator
set_hp.source_x = x
set_hp.source_y = y
set_hp.source_z = z
self.send_contained(set_hp)
[docs] def set_weapon(self, weapon: int, local: bool = False, no_kill: bool = False) -> None:
self.weapon = weapon
if self.weapon_object is not None:
self.weapon_object.reset()
self.weapon_object = WEAPONS[weapon](self._on_reload)
if not local:
change_weapon = loaders.ChangeWeapon()
self.protocol.broadcast_contained(change_weapon, save=True)
if not no_kill:
self.kill(kill_type=CLASS_CHANGE_KILL)
[docs] def set_team(self, team):
if team is self.team:
return
self.drop_flag()
old_team = self.team
self.team = team
self.on_team_changed(old_team)
if old_team.spectator:
self.respawn()
else:
self.kill(kill_type=TEAM_CHANGE_KILL)
[docs] def kill(self, by: None = None, kill_type: int = WEAPON_KILL, grenade: None = None) -> None:
if self.hp is None:
return
if self.on_kill(by, kill_type, grenade) is False:
return
self.drop_flag()
self.hp = None
self.weapon_object.reset()
kill_action = loaders.KillAction()
kill_action.kill_type = kill_type
if by is None:
kill_action.killer_id = kill_action.player_id = self.player_id
else:
kill_action.killer_id = by.player_id
kill_action.player_id = self.player_id
if by is not None and by is not self:
by.add_score(1)
kill_action.respawn_time = self.get_respawn_time() + 1
self.protocol.broadcast_contained(kill_action, save=True)
self.world_object.dead = True
self.respawn()
[docs] def add_score(self, score):
self.kills += score
def _connection_ack(self) -> None:
self._send_connection_data()
self.send_map(ProgressiveMapGenerator(self.protocol.map))
if not self.client_info:
handshake_init = loaders.HandShakeInit()
self.send_contained(handshake_init)
def _send_connection_data(self) -> None:
saved_loaders = self.saved_loaders = []
if self.player_id is None:
for player in self.protocol.players.values():
if player.name is None:
continue
existing_player = loaders.ExistingPlayer()
existing_player.name = player.name
existing_player.player_id = player.player_id
existing_player.tool = player.tool or 0
existing_player.weapon = player.weapon
existing_player.kills = player.kills
existing_player.team = player.team.id
existing_player.color = make_color(*player.color)
saved_loaders.append(existing_player.generate())
self.player_id = self.protocol.player_ids.pop()
self.protocol.update_master()
# send initial data
blue = self.protocol.blue_team
green = self.protocol.green_team
state_data = loaders.StateData()
state_data.player_id = self.player_id
state_data.fog_color = self.protocol.fog_color
state_data.team1_color = blue.color
state_data.team1_name = blue.name
state_data.team2_color = green.color
state_data.team2_name = green.name
game_mode = self.protocol.game_mode
if game_mode == CTF_MODE:
blue_base = blue.base
blue_flag = blue.flag
green_base = green.base
green_flag = green.flag
ctf_data = loaders.CTFState()
ctf_data.cap_limit = self.protocol.max_score
ctf_data.team1_score = blue.score
ctf_data.team2_score = green.score
ctf_data.team1_base_x = blue_base.x
ctf_data.team1_base_y = blue_base.y
ctf_data.team1_base_z = blue_base.z
ctf_data.team2_base_x = green_base.x
ctf_data.team2_base_y = green_base.y
ctf_data.team2_base_z = green_base.z
if green_flag.player is None:
ctf_data.team1_has_intel = 0
ctf_data.team2_flag_x = green_flag.x
ctf_data.team2_flag_y = green_flag.y
ctf_data.team2_flag_z = green_flag.z
else:
ctf_data.team1_has_intel = 1
ctf_data.team2_carrier = green_flag.player.player_id
if blue_flag.player is None:
ctf_data.team2_has_intel = 0
ctf_data.team1_flag_x = blue_flag.x
ctf_data.team1_flag_y = blue_flag.y
ctf_data.team1_flag_z = blue_flag.z
else:
ctf_data.team2_has_intel = 1
ctf_data.team1_carrier = blue_flag.player.player_id
state_data.state = ctf_data
elif game_mode == TC_MODE:
state_data.state = tc_data
generated_data = state_data.generate()
saved_loaders.append(generated_data)
[docs] def grenade_exploded(self, grenade: world.Grenade) -> None:
if self.name is None or self.team.spectator:
return
if grenade.team is not None and grenade.team is not self.team:
# could happen if the player changed team
return
position = grenade.position
x = position.x
y = position.y
z = position.z
if x < 0 or x > 512 or y < 0 or y > 512 or z < 0 or z > 63:
return
x = int(math.floor(x))
y = int(math.floor(y))
z = int(math.floor(z))
for player_list in (self.team.other.get_players(), (self,)):
for player in player_list:
if not player.hp:
continue
damage = grenade.get_damage(player.world_object.position)
if damage == 0:
continue
self.on_unvalidated_hit(damage, player, GRENADE_KILL, grenade)
returned = self.on_hit(damage, player, GRENADE_KILL, grenade)
if returned == False:
continue
elif returned is not None:
damage = returned
player.set_hp(player.hp - damage, self,
hit_indicator=position.get(), kill_type=GRENADE_KILL,
grenade=grenade)
if self.on_block_destroy(x, y, z, GRENADE_DESTROY) == False:
return
map = self.protocol.map
for n_x, n_y, n_z in product(range(x - 1, x + 2), range(y - 1, y + 2), range(z - 1, z + 2)):
count = map.destroy_point(n_x, n_y, n_z)
if count:
self.total_blocks_removed += count
self.on_block_removed(n_x, n_y, n_z)
block_action = loaders.BlockAction()
block_action.x = x
block_action.y = y
block_action.z = z
block_action.value = GRENADE_DESTROY
block_action.player_id = self.player_id
self.protocol.broadcast_contained(block_action, save=True)
self.protocol.update_entities()
def _on_fall(self, damage: int) -> None:
if not self.hp:
return
returned = self.on_fall(damage)
if returned is False:
return
elif returned is not None:
damage = returned
self.set_hp(self.hp - damage, kill_type=FALL_KILL)
def _on_reload(self):
weapon_reload = loaders.WeaponReload()
weapon_reload.player_id = self.player_id
weapon_reload.clip_ammo = self.weapon_object.current_ammo
weapon_reload.reserve_ammo = self.weapon_object.current_stock
self.send_contained(weapon_reload)
[docs] def send_map(self, data: Optional[ProgressiveMapGenerator] = None) -> None:
if data is not None:
self.map_data = data
map_start = loaders.MapStart()
map_start.size = data.get_size()
self.send_contained(map_start)
elif self.map_data is None:
return
if not self.map_data.data_left():
log.debug("done sending map data to {player}", player=self)
self.map_data = None
for data in self.saved_loaders:
packet = enet.Packet(bytes(data), enet.PACKET_FLAG_RELIABLE)
self.peer.send(0, packet)
self.saved_loaders = None
self.on_join()
return
for _ in range(10):
if not self.map_data.data_left():
break
map_data = loaders.MapChunk()
map_data.data = self.map_data.read(8192)
self.send_contained(map_data)
[docs] def continue_map_transfer(self) -> None:
self.send_map()
[docs] def send_data(self, data):
self.protocol.transport.write(data, self.address)
[docs] def send_chat(self, value: str, global_message: bool = False, custom_type: int = CHAT_ALL) -> None:
if self.deaf:
return
chat_message = loaders.ChatMessage()
if custom_type > 2 and "client" in self.client_info:
if EXTENSION_CHATTYPE in self.proto_extensions:
chat_message.chat_type = custom_type
else:
value = OPENSPADES_CHATTYPES[custom_type] + value
chat_message.player_id = 35
prefix = ''
elif not global_message:
chat_message.chat_type = CHAT_SYSTEM
prefix = ''
else:
chat_message.chat_type = CHAT_TEAM
# 34 is guaranteed to be out of range!
chat_message.player_id = 35
prefix = self.protocol.server_prefix + ' '
lines = textwrap.wrap(value, MAX_CHAT_SIZE - len(prefix) - 1)
for line in lines:
chat_message.value = '{}{}'.format(prefix, line)
self.send_contained(chat_message)
[docs] def send_chat_warning(self, message):
"""
Send a warning message. This gets displayed as a yellow popup
with sound for OpenSpades/BetterSpades clients
"""
self.send_chat(message, custom_type=CHAT_WARNING)
[docs] def send_chat_notice(self, message):
"""
Send a notice. This gets displayed as a popup for OpenSpades/Betterspades
clients
"""
self.send_chat(message, custom_type=CHAT_INFO)
[docs] def send_chat_error(self, message):
"""
Send a error message. This gets displayed as a red popup with
sound for OpenSpades/Betterspades clients
"""
self.send_chat(message, custom_type=CHAT_ERROR)
[docs] def send_chat_status(self, message):
"""
Send a status message. This gets displayed in the center of the
screen for OpenSpades/Betterspades clients
"""
self.send_chat(message, custom_type=CHAT_BIG)
# events/hooks
[docs] def on_join(self):
pass
[docs] def on_login(self, name):
pass
[docs] def on_spawn(self, pos: Tuple[float, float, float]) -> None:
pass
[docs] def on_spawn_location(self, pos: Tuple[float, float, float]) -> None:
pass
[docs] def on_chat(self, value, global_message):
pass
[docs] def on_chat_sent(self, value: str, global_message: bool) -> None:
pass
[docs] def on_command(self, command, parameters):
pass
[docs] def on_hit(self, hit_amount, hit_player, kill_type, grenade):
pass
[docs] def on_unvalidated_hit(self, hit_amount, hit_player, kill_type, grenade):
pass
[docs] def on_kill(self, killer, kill_type, grenade):
pass
[docs] def on_team_join(self, team):
pass
[docs] def on_team_changed(self, old_team: Team) -> None:
pass
[docs] def on_grenade(self, time_left):
pass
[docs] def on_grenade_thrown(self, grenade: world.Grenade) -> None:
pass
[docs] def on_block_build_attempt(self, x, y, z):
pass
[docs] def on_block_build(self, x, y, z):
pass
[docs] def on_line_build_start(self):
"""called when the player has pressed the mouse button to start
line-building"""
[docs] def on_line_build_attempt(self, points):
pass
[docs] def on_line_build(self, points):
pass
[docs] def on_block_destroy(self, x, y, z, mode):
pass
[docs] def on_block_removed(self, x, y, z):
pass
[docs] def on_refill(self):
pass
[docs] def on_color_set_attempt(self, color: Tuple[int, int, int]) -> None:
pass
[docs] def on_color_set(self, color: Tuple[int, int, int]) -> None:
pass
[docs] def on_flag_take(self):
pass
[docs] def on_flag_capture(self):
pass
[docs] def on_flag_drop(self):
pass
[docs] def on_hack_attempt(self, reason):
pass
[docs] def on_position_update(self) -> None:
pass
[docs] def on_weapon_set(self, value):
pass
[docs] def on_fall(self, damage):
pass
[docs] def on_reset(self):
pass
[docs] def on_orientation_update(self, x: float, y: float, z: float) -> None:
pass
[docs] def on_shoot_set(self, fire: int) -> None:
pass
[docs] def on_secondary_fire_set(self, secondary):
pass
[docs] def on_walk_update(self, up: bool, down: bool, left: bool, right: bool) -> None:
pass
[docs] def on_animation_update(self, jump, crouch, sneak, sprint):
pass
def __repr__(self):
return "<{} player_id: {!r}, name: {!r}, address: {!r} at 0x{:x}>".format(
self.__class__.__name__, self.player_id, self.name, self.address,
id(self)
)