nise/nise-circleguard/src/WriteStreamWrapper.py

111 lines
3.7 KiB
Python
Raw Normal View History

2024-02-14 16:43:11 +00:00
import base64
import hashlib
import struct
from datetime import datetime
class WriteStreamWrapper:
def __init__(self, stream, stream_is_closable):
self.stream = stream
self.stream_is_closable = stream_is_closable
def write(self, data):
self.stream.write(data)
def write_string(self, string):
str_bytes = string.encode('utf-8')
str_header = len(str_bytes).to_bytes(1, byteorder='little', signed=False)
str_header = b'\x0b' + str_header
self.stream.write(str_header)
self.stream.write(str_bytes)
def write_byte(self, byte):
self.stream.write(struct.pack('B', byte))
def write_short(self, short):
self.stream.write(struct.pack('<H', short))
def write_integer(self, integer):
self.stream.write(struct.pack('<I', integer))
def write_long(self, low, high):
self.stream.write(struct.pack('<II', low, high))
def write_long_with_big_int(self, bigint):
low = bigint & 0xffffffff
high = bigint >> 32
self.write_long(low, high)
def write_dot_net_date_ticks(self, date):
weird_constant = 429.4967296
unix_ticks = (date.timestamp() + 62135596800)
tmp = unix_ticks / weird_constant
high = int(tmp)
low = int((tmp - high) * weird_constant * 10000000)
self.write_long(low, high)
def write_osr_data2(self, replay_data: str, mods: int):
data = {
'gameMode': 0,
'beatmapHash': 'beatmap_hash',
'score': {
'username': '',
'maxcombo': 0,
'count300': 0,
'count100': 0,
'count50': 0,
'countgeki': 0,
'countkatu': 0,
'countmiss': 0,
'score': 0,
'rank': '',
'perfect': True,
'enabled_mods': mods,
'date': '2024-03-05 12:15:36.000',
'score_id': 0
},
'replayData': replay_data
}
self.write_byte(data['gameMode'])
self.write_integer(20241228)
self.write_string(data['beatmapHash'])
self.write_string('Anonymous')
replay_hash_data = f"{data['score']['maxcombo']}osu{data['score']['username']}{data['beatmapHash']}{data['score']['score']}{data['score']['rank']}"
replay_hash = hashlib.md5(replay_hash_data.encode()).hexdigest()
self.write_string(replay_hash)
for key in ['count300', 'count100', 'count50', 'countgeki', 'countkatu', 'countmiss']:
val = data['score'][key]
if val is not None:
self.write_short(int(val))
else:
self.write_short(0)
self.write_integer(int(data['score']['score']))
self.write_short(int(data['score']['maxcombo']))
self.write_byte(int(data['score']['perfect']))
self.write_integer(int(data['score']['enabled_mods']))
self.write_string('')
replay_date = self.parse_date(str(data['score']['date']))
self.write_dot_net_date_ticks(replay_date)
replay_data_decoded = base64.b64decode(data['replayData'])
self.write_integer(len(replay_data_decoded))
self.write(replay_data_decoded)
self.write_long_with_big_int(int(data['score']['score_id']))
@staticmethod
def parse_date(date_str):
if 'Z' in date_str and '+' in date_str:
date_str = date_str.replace('Z', '')
elif 'Z' in date_str:
date_str = date_str.replace('Z', '+00:00')
elif ' ' in date_str:
date_str = date_str.replace(' ', 'T')
return datetime.fromisoformat(date_str)
def end(self):
if self.stream_is_closable:
self.stream.close()